Commit 64c60145cf876560336accac4166c2243df7523c

Authored by Kyle Schutt
Committed by Andrew Kane
1 parent a763e066

Elasticsearch Scroll API (#1267)

* adding scroll logic

* adding tests and readme

* cleaning up scroll test from old branch

* removing scroll id mutation and cleaning up tests
README.md
... ... @@ -229,6 +229,21 @@ View with will_paginate
229 229 <%= will_paginate @products %>
230 230 ```
231 231  
  232 +### Scroll API
  233 +
  234 +For large datasets, a scroll context is more efficient than deep pagination. Create a scroll context by passing in the desired keep-alive time and the number of records per page.
  235 +
  236 +Request the next set of records in the scroll context. You can either use the original search context or any subsequent record set to fetch the next batch of records.
  237 +
  238 +```ruby
  239 +products = Product.search "milk", per_page: 10, scroll: '1m'
  240 +while products.any?
  241 + # code ...
  242 +
  243 + products = products.scroll
  244 +end
  245 +```
  246 +
232 247 ### Partial Matches
233 248  
234 249 By default, results must match all words in the query.
... ...
lib/searchkick/logging.rb
... ... @@ -167,7 +167,9 @@ module Searchkick
167 167  
168 168 # no easy way to tell which host the client will use
169 169 host = Searchkick.client.transport.hosts.first
170   - debug " #{color(name, YELLOW, true)} curl #{host[:protocol]}://#{host[:host]}:#{host[:port]}/#{CGI.escape(index)}#{type ? "/#{type.map { |t| CGI.escape(t) }.join(',')}" : ''}/_search?pretty -H 'Content-Type: application/json' -d '#{payload[:query][:body].to_json}'"
  170 + params = ['pretty']
  171 + params << 'scroll=' + payload[:query][:scroll] if payload[:query][:scroll]
  172 + debug " #{color(name, YELLOW, true)} curl #{host[:protocol]}://#{host[:host]}:#{host[:port]}/#{CGI.escape(index)}#{type ? "/#{type.map { |t| CGI.escape(t) }.join(',')}" : ''}/_search?#{params.join('&')} -H 'Content-Type: application/json' -d '#{payload[:query][:body].to_json}'"
171 173 end
172 174  
173 175 def request(event)
... ...
lib/searchkick/query.rb
... ... @@ -12,14 +12,14 @@ module Searchkick
12 12 :took, :error, :model_name, :entry_name, :total_count, :total_entries,
13 13 :current_page, :per_page, :limit_value, :padding, :total_pages, :num_pages,
14 14 :offset_value, :offset, :previous_page, :prev_page, :next_page, :first_page?, :last_page?,
15   - :out_of_range?, :hits, :response, :to_a, :first
  15 + :out_of_range?, :hits, :response, :to_a, :first, :scroll
16 16  
17 17 def initialize(klass, term = "*", **options)
18 18 unknown_keywords = options.keys - [:aggs, :block, :body, :body_options, :boost,
19 19 :boost_by, :boost_by_distance, :boost_by_recency, :boost_where, :conversions, :conversions_term, :debug, :emoji, :exclude, :execute, :explain,
20 20 :fields, :highlight, :includes, :index_name, :indices_boost, :limit, :load,
21 21 :match, :misspellings, :models, :model_includes, :offset, :operator, :order, :padding, :page, :per_page, :profile,
22   - :request_params, :routing, :scope_results, :select, :similar, :smart_aggs, :suggest, :total_entries, :track, :type, :where]
  22 + :request_params, :routing, :scope_results, :select, :similar, :smart_aggs, :suggest, :total_entries, :track, :type, :where, :scroll]
23 23 raise ArgumentError, "unknown keywords: #{unknown_keywords.join(", ")}" if unknown_keywords.any?
24 24  
25 25 term = term.to_s
... ... @@ -82,6 +82,7 @@ module Searchkick
82 82 }
83 83 params[:type] = @type if @type
84 84 params[:routing] = @routing if @routing
  85 + params[:scroll] = @scroll if @scroll
85 86 params.merge!(options[:request_params]) if options[:request_params]
86 87 params
87 88 end
... ... @@ -109,7 +110,9 @@ module Searchkick
109 110 # no easy way to tell which host the client will use
110 111 host = Searchkick.client.transport.hosts.first
111 112 credentials = host[:user] || host[:password] ? "#{host[:user]}:#{host[:password]}@" : nil
112   - "curl #{host[:protocol]}://#{credentials}#{host[:host]}:#{host[:port]}/#{CGI.escape(index)}#{type ? "/#{type.map { |t| CGI.escape(t) }.join(',')}" : ''}/_search?pretty -H 'Content-Type: application/json' -d '#{query[:body].to_json}'"
  113 + params = ['pretty']
  114 + params << 'scroll=' + options[:scroll] if options[:scroll]
  115 + "curl #{host[:protocol]}://#{credentials}#{host[:host]}:#{host[:port]}/#{CGI.escape(index)}#{type ? "/#{type.map { |t| CGI.escape(t) }.join(',')}" : ''}/_search?#{params.join('&')} -H 'Content-Type: application/json' -d '#{query[:body].to_json}'"
113 116 end
114 117  
115 118 def handle_response(response)
... ... @@ -129,7 +132,8 @@ module Searchkick
129 132 scope_results: options[:scope_results],
130 133 total_entries: options[:total_entries],
131 134 index_mapping: @index_mapping,
132   - suggest: options[:suggest]
  135 + suggest: options[:suggest],
  136 + scroll: options[:scroll]
133 137 }
134 138  
135 139 if options[:debug]
... ... @@ -230,6 +234,7 @@ module Searchkick
230 234 per_page = (options[:limit] || options[:per_page] || 10_000).to_i
231 235 padding = [options[:padding].to_i, 0].max
232 236 offset = options[:offset] || (page - 1) * per_page + padding
  237 + scroll = options[:scroll]
233 238  
234 239 # model and eager loading
235 240 load = options[:load].nil? ? true : options[:load]
... ... @@ -519,6 +524,7 @@ module Searchkick
519 524 @per_page = per_page
520 525 @padding = padding
521 526 @load = load
  527 + @scroll = scroll
522 528 end
523 529  
524 530 def set_fields
... ...
lib/searchkick/results.rb
... ... @@ -221,6 +221,32 @@ module Searchkick
221 221 @options[:misspellings]
222 222 end
223 223  
  224 + def scroll_id
  225 + @response['_scroll_id']
  226 + end
  227 +
  228 + def scroll
  229 + if scroll_id.present? && options[:scroll].nil?
  230 + raise Searchkick::Error, "Scroll error - scroll keepalive must be defined"
  231 + elsif scroll_id.nil?
  232 + raise Searchkick::Error, "Scroll error - a scroll id has not been provided"
  233 + else
  234 + begin
  235 + params = {
  236 + scroll: options[:scroll],
  237 + scroll_id: scroll_id
  238 + }
  239 + Searchkick::Results.new(@klass, Searchkick.client.scroll(params), @options)
  240 + rescue Elasticsearch::Transport::Transport::Errors::NotFound => e
  241 + if e.class.to_s =~ /NotFound/ && e.message =~ /search_context_missing_exception/i
  242 + raise Searchkick::Error, "Scroll error - a scroll id does not exist or has expired"
  243 + else
  244 + raise e
  245 + end
  246 + end
  247 + end
  248 + end
  249 +
224 250 private
225 251  
226 252 def results_query(records, hits)
... ...
test/scroll_test.rb 0 → 100644
... ... @@ -0,0 +1,57 @@
  1 +require_relative "test_helper"
  2 +
  3 +class ScrollTest < Minitest::Test
  4 + def test_scroll
  5 + store_names ["Product A", "Product B", "Product C", "Product D", "Product E", "Product F"]
  6 + products = Product.search("product", order: {name: :asc}, scroll: '1m', per_page: 2)
  7 + assert_equal ["Product A", "Product B"], products.map(&:name)
  8 + assert_equal "product", products.entry_name
  9 + assert_equal "1m", products.options[:scroll]
  10 + assert_equal products.response["_scroll_id"], products.scroll_id
  11 + assert_equal 2, products.size
  12 + assert_equal 2, products.length
  13 + assert_equal 6, products.total_count
  14 + assert_equal 6, products.total_entries
  15 + assert products.any?
  16 +
  17 + # scroll for next 2
  18 + products = products.scroll
  19 + assert_equal ["Product C", "Product D"], products.map(&:name)
  20 + assert_equal "product", products.entry_name
  21 + # scroll for next 2
  22 + products = products.scroll
  23 + assert_equal ["Product E", "Product F"], products.map(&:name)
  24 + assert_equal "product", products.entry_name
  25 + # scroll exhausted
  26 + products = products.scroll
  27 + assert_equal [], products.map(&:name)
  28 + assert_equal "product", products.entry_name
  29 + end
  30 +
  31 + def test_scroll_body
  32 + store_names ["Product A", "Product B", "Product C", "Product D", "Product E", "Product F"]
  33 + products = Product.search("product", body: {query: {match_all: {}}, sort: [{name: "asc"}]}, scroll: '1m', per_page: 2)
  34 + assert_equal ["Product A", "Product B"], products.map(&:name)
  35 + assert_equal "product", products.entry_name
  36 + assert_equal "1m", products.options[:scroll]
  37 + assert_equal products.response["_scroll_id"], products.scroll_id
  38 + assert_equal 2, products.size
  39 + assert_equal 2, products.length
  40 + assert_equal 6, products.total_count
  41 + assert_equal 6, products.total_entries
  42 + assert products.any?
  43 +
  44 + # scroll for next 2
  45 + products = products.scroll
  46 + assert_equal ["Product C", "Product D"], products.map(&:name)
  47 + assert_equal "product", products.entry_name
  48 + # scroll for next 2
  49 + products = products.scroll
  50 + assert_equal ["Product E", "Product F"], products.map(&:name)
  51 + assert_equal "product", products.entry_name
  52 + # scroll exhausted
  53 + products = products.scroll
  54 + assert_equal [], products.map(&:name)
  55 + assert_equal "product", products.entry_name
  56 + end
  57 +end
... ...