From 64c60145cf876560336accac4166c2243df7523c Mon Sep 17 00:00:00 2001 From: Kyle Schutt Date: Thu, 30 May 2019 18:27:50 -0400 Subject: [PATCH] Elasticsearch Scroll API (#1267) --- README.md | 15 +++++++++++++++ lib/searchkick/logging.rb | 4 +++- lib/searchkick/query.rb | 14 ++++++++++---- lib/searchkick/results.rb | 26 ++++++++++++++++++++++++++ test/scroll_test.rb | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 111 insertions(+), 5 deletions(-) create mode 100644 test/scroll_test.rb diff --git a/README.md b/README.md index f028258..fa25fea 100644 --- a/README.md +++ b/README.md @@ -229,6 +229,21 @@ View with will_paginate <%= will_paginate @products %> ``` +### Scroll API + +For large datasets, a scroll context is more efficient than deep pagination. Create a scroll context by passing in the desired keep-alive time and the number of records per page. + +Request the next set of records in the scroll context. You can either use the original search context or any subsequent record set to fetch the next batch of records. + +```ruby +products = Product.search "milk", per_page: 10, scroll: '1m' +while products.any? + # code ... + + products = products.scroll +end +``` + ### Partial Matches By default, results must match all words in the query. diff --git a/lib/searchkick/logging.rb b/lib/searchkick/logging.rb index 9e22454..1275806 100644 --- a/lib/searchkick/logging.rb +++ b/lib/searchkick/logging.rb @@ -167,7 +167,9 @@ module Searchkick # no easy way to tell which host the client will use host = Searchkick.client.transport.hosts.first - debug " #{color(name, YELLOW, true)} curl #{host[:protocol]}://#{host[:host]}:#{host[:port]}/#{CGI.escape(index)}#{type ? "/#{type.map { |t| CGI.escape(t) }.join(',')}" : ''}/_search?pretty -H 'Content-Type: application/json' -d '#{payload[:query][:body].to_json}'" + params = ['pretty'] + params << 'scroll=' + payload[:query][:scroll] if payload[:query][:scroll] + debug " #{color(name, YELLOW, true)} curl #{host[:protocol]}://#{host[:host]}:#{host[:port]}/#{CGI.escape(index)}#{type ? "/#{type.map { |t| CGI.escape(t) }.join(',')}" : ''}/_search?#{params.join('&')} -H 'Content-Type: application/json' -d '#{payload[:query][:body].to_json}'" end def request(event) diff --git a/lib/searchkick/query.rb b/lib/searchkick/query.rb index f1f92b2..4f17590 100644 --- a/lib/searchkick/query.rb +++ b/lib/searchkick/query.rb @@ -12,14 +12,14 @@ module Searchkick :took, :error, :model_name, :entry_name, :total_count, :total_entries, :current_page, :per_page, :limit_value, :padding, :total_pages, :num_pages, :offset_value, :offset, :previous_page, :prev_page, :next_page, :first_page?, :last_page?, - :out_of_range?, :hits, :response, :to_a, :first + :out_of_range?, :hits, :response, :to_a, :first, :scroll def initialize(klass, term = "*", **options) unknown_keywords = options.keys - [:aggs, :block, :body, :body_options, :boost, :boost_by, :boost_by_distance, :boost_by_recency, :boost_where, :conversions, :conversions_term, :debug, :emoji, :exclude, :execute, :explain, :fields, :highlight, :includes, :index_name, :indices_boost, :limit, :load, :match, :misspellings, :models, :model_includes, :offset, :operator, :order, :padding, :page, :per_page, :profile, - :request_params, :routing, :scope_results, :select, :similar, :smart_aggs, :suggest, :total_entries, :track, :type, :where] + :request_params, :routing, :scope_results, :select, :similar, :smart_aggs, :suggest, :total_entries, :track, :type, :where, :scroll] raise ArgumentError, "unknown keywords: #{unknown_keywords.join(", ")}" if unknown_keywords.any? term = term.to_s @@ -82,6 +82,7 @@ module Searchkick } params[:type] = @type if @type params[:routing] = @routing if @routing + params[:scroll] = @scroll if @scroll params.merge!(options[:request_params]) if options[:request_params] params end @@ -109,7 +110,9 @@ module Searchkick # no easy way to tell which host the client will use host = Searchkick.client.transport.hosts.first credentials = host[:user] || host[:password] ? "#{host[:user]}:#{host[:password]}@" : nil - "curl #{host[:protocol]}://#{credentials}#{host[:host]}:#{host[:port]}/#{CGI.escape(index)}#{type ? "/#{type.map { |t| CGI.escape(t) }.join(',')}" : ''}/_search?pretty -H 'Content-Type: application/json' -d '#{query[:body].to_json}'" + params = ['pretty'] + params << 'scroll=' + options[:scroll] if options[:scroll] + "curl #{host[:protocol]}://#{credentials}#{host[:host]}:#{host[:port]}/#{CGI.escape(index)}#{type ? "/#{type.map { |t| CGI.escape(t) }.join(',')}" : ''}/_search?#{params.join('&')} -H 'Content-Type: application/json' -d '#{query[:body].to_json}'" end def handle_response(response) @@ -129,7 +132,8 @@ module Searchkick scope_results: options[:scope_results], total_entries: options[:total_entries], index_mapping: @index_mapping, - suggest: options[:suggest] + suggest: options[:suggest], + scroll: options[:scroll] } if options[:debug] @@ -230,6 +234,7 @@ module Searchkick per_page = (options[:limit] || options[:per_page] || 10_000).to_i padding = [options[:padding].to_i, 0].max offset = options[:offset] || (page - 1) * per_page + padding + scroll = options[:scroll] # model and eager loading load = options[:load].nil? ? true : options[:load] @@ -519,6 +524,7 @@ module Searchkick @per_page = per_page @padding = padding @load = load + @scroll = scroll end def set_fields diff --git a/lib/searchkick/results.rb b/lib/searchkick/results.rb index f2ddeb4..cc18f0d 100644 --- a/lib/searchkick/results.rb +++ b/lib/searchkick/results.rb @@ -221,6 +221,32 @@ module Searchkick @options[:misspellings] end + def scroll_id + @response['_scroll_id'] + end + + def scroll + if scroll_id.present? && options[:scroll].nil? + raise Searchkick::Error, "Scroll error - scroll keepalive must be defined" + elsif scroll_id.nil? + raise Searchkick::Error, "Scroll error - a scroll id has not been provided" + else + begin + params = { + scroll: options[:scroll], + scroll_id: scroll_id + } + Searchkick::Results.new(@klass, Searchkick.client.scroll(params), @options) + rescue Elasticsearch::Transport::Transport::Errors::NotFound => e + if e.class.to_s =~ /NotFound/ && e.message =~ /search_context_missing_exception/i + raise Searchkick::Error, "Scroll error - a scroll id does not exist or has expired" + else + raise e + end + end + end + end + private def results_query(records, hits) diff --git a/test/scroll_test.rb b/test/scroll_test.rb new file mode 100644 index 0000000..46cc561 --- /dev/null +++ b/test/scroll_test.rb @@ -0,0 +1,57 @@ +require_relative "test_helper" + +class ScrollTest < Minitest::Test + def test_scroll + store_names ["Product A", "Product B", "Product C", "Product D", "Product E", "Product F"] + products = Product.search("product", order: {name: :asc}, scroll: '1m', per_page: 2) + assert_equal ["Product A", "Product B"], products.map(&:name) + assert_equal "product", products.entry_name + assert_equal "1m", products.options[:scroll] + assert_equal products.response["_scroll_id"], products.scroll_id + assert_equal 2, products.size + assert_equal 2, products.length + assert_equal 6, products.total_count + assert_equal 6, products.total_entries + assert products.any? + + # scroll for next 2 + products = products.scroll + assert_equal ["Product C", "Product D"], products.map(&:name) + assert_equal "product", products.entry_name + # scroll for next 2 + products = products.scroll + assert_equal ["Product E", "Product F"], products.map(&:name) + assert_equal "product", products.entry_name + # scroll exhausted + products = products.scroll + assert_equal [], products.map(&:name) + assert_equal "product", products.entry_name + end + + def test_scroll_body + store_names ["Product A", "Product B", "Product C", "Product D", "Product E", "Product F"] + products = Product.search("product", body: {query: {match_all: {}}, sort: [{name: "asc"}]}, scroll: '1m', per_page: 2) + assert_equal ["Product A", "Product B"], products.map(&:name) + assert_equal "product", products.entry_name + assert_equal "1m", products.options[:scroll] + assert_equal products.response["_scroll_id"], products.scroll_id + assert_equal 2, products.size + assert_equal 2, products.length + assert_equal 6, products.total_count + assert_equal 6, products.total_entries + assert products.any? + + # scroll for next 2 + products = products.scroll + assert_equal ["Product C", "Product D"], products.map(&:name) + assert_equal "product", products.entry_name + # scroll for next 2 + products = products.scroll + assert_equal ["Product E", "Product F"], products.map(&:name) + assert_equal "product", products.entry_name + # scroll exhausted + products = products.scroll + assert_equal [], products.map(&:name) + assert_equal "product", products.entry_name + end +end -- libgit2 0.21.0