From 929c4ce7ec8186bd92e6b0d1791e0b1dcb85506e Mon Sep 17 00:00:00 2001 From: Andrew Kane Date: Wed, 11 Jan 2017 23:43:52 -0800 Subject: [PATCH] Added async reindex --- README.md | 35 +++++++++++++++++++++++++++++++---- lib/searchkick.rb | 17 +++++++++++++++-- lib/searchkick/bulk_reindex_job.rb | 12 ++++++++++++ lib/searchkick/index.rb | 100 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------- lib/searchkick/model.rb | 3 ++- test/reindex_test.rb | 17 +++++++++++++++++ 6 files changed, 152 insertions(+), 32 deletions(-) create mode 100644 lib/searchkick/bulk_reindex_job.rb diff --git a/README.md b/README.md index 28676cf..3bbf79c 100644 --- a/README.md +++ b/README.md @@ -1156,6 +1156,37 @@ Reindex and search with: Business.search "ice cream", routing: params[:city_id] ``` +## Large Data Sets + +### Background Reindexing [experimental, ActiveRecord only] + +For large data sets, you can use background jobs to parallelize reindexing. + +```ruby +Product.reindex(async: true) +# {index_name: "products_production_20170111210018065"} +``` + +Once the jobs complete, promote the new index with: + +```ruby +Product.searchkick_index.promote(index_name) +``` + +You can optionally track the status with Redis: + +```ruby +Searchkick.redis = Redis.new +``` + +And use: + +```ruby +Searchkick.reindex_status(index_name) +``` + +For more tips, check out [Keeping Elasticsearch in Sync](https://www.elastic.co/blog/found-keeping-elasticsearch-in-sync). + ## Advanced Prefer to use the [Elasticsearch DSL](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-queries.html) but still want awesome features like zero-downtime reindexing? @@ -1457,10 +1488,6 @@ Product.search "api", misspellings: {prefix_length: 2} # api, apt, no ahi Product.search "ah", misspellings: {prefix_length: 2} # ah, no aha ``` -## Large Data Sets - -For large data sets, check out [Keeping Elasticsearch in Sync](https://www.elastic.co/blog/found-keeping-elasticsearch-in-sync). Searchkick will make this easy in the future. - ## Testing This section could use some love. diff --git a/lib/searchkick.rb b/lib/searchkick.rb index 46d4cf2..03be9fb 100644 --- a/lib/searchkick.rb +++ b/lib/searchkick.rb @@ -19,7 +19,10 @@ begin rescue LoadError # do nothing end -require "searchkick/reindex_v2_job" if defined?(ActiveJob) +if defined?(ActiveJob) + require "searchkick/bulk_reindex_job" + require "searchkick/reindex_v2_job" +end module Searchkick class Error < StandardError; end @@ -30,7 +33,7 @@ module Searchkick class ImportError < Error; end class << self - attr_accessor :search_method_name, :wordnet_path, :timeout, :models, :client_options + attr_accessor :search_method_name, :wordnet_path, :timeout, :models, :client_options, :redis attr_writer :client, :env, :search_timeout attr_reader :aws_credentials end @@ -129,6 +132,16 @@ module Searchkick @client = nil # reset client end + def self.reindex_status(index_name) + if redis + batches_left = Searchkick::Index.new(index_name).batches_left + { + completed: batches_left == 0, + batches_left: batches_left + } + end + end + # private def self.indexer Thread.current[:searchkick_indexer] ||= Searchkick::Indexer.new diff --git a/lib/searchkick/bulk_reindex_job.rb b/lib/searchkick/bulk_reindex_job.rb new file mode 100644 index 0000000..af73e8b --- /dev/null +++ b/lib/searchkick/bulk_reindex_job.rb @@ -0,0 +1,12 @@ +module Searchkick + class BulkReindexJob < ActiveJob::Base + queue_as :searchkick + + def perform(class_name:, record_ids: nil, index_name: nil, method_name: nil, batch_id: nil, min_id: nil, max_id: nil) + klass = class_name.constantize + index = index_name ? Searchkick::Index.new(index_name) : klass.searchkick_index + record_ids ||= min_id..max_id + index.import_scope(klass.where(klass.primary_key => record_ids), method_name: method_name, batch: true, batch_id: batch_id) + end + end +end diff --git a/lib/searchkick/index.rb b/lib/searchkick/index.rb index d2b8a5b..35a704e 100644 --- a/lib/searchkick/index.rb +++ b/lib/searchkick/index.rb @@ -191,7 +191,7 @@ module Searchkick # https://gist.github.com/jarosan/3124884 # http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/ - def reindex_scope(scope, import: true, resume: false, retain: false) + def reindex_scope(scope, import: true, resume: false, retain: false, async: false) if resume index_name = all_indices.sort.last raise Searchkick::Error, "No index to resume" unless index_name @@ -205,30 +205,58 @@ module Searchkick # check if alias exists if alias_exists? # import before promotion - index.import_scope(scope, resume: resume) if import + index.import_scope(scope, resume: resume, async: async, full: true) if import # get existing indices to remove - promote(index.name) - clean_indices unless retain + unless async + promote(index.name) + clean_indices unless retain + end else delete if exists? promote(index.name) # import after promotion - index.import_scope(scope, resume: resume) if import + index.import_scope(scope, resume: resume, async: async, full: true) if import end - index.refresh - - true + if async + {index_name: index.name} + else + index.refresh + true + end end - def import_scope(scope, resume: false, method_name: nil) + def import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false) batch_size = @options[:batch_size] || 1000 # use scope for import scope = scope.search_import if scope.respond_to?(:search_import) - if scope.respond_to?(:find_in_batches) + + if batch + import_or_update scope.to_a, method_name, async + Searchkick.redis.srem(batches_key, batch_id) if batch_id && Searchkick.redis + elsif full && async + # TODO expire Redis key + primary_key = scope.primary_key + starting_id = scope.minimum(primary_key) + max_id = scope.maximum(primary_key) + batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil + + batches_count.times do |i| + batch_id = i + 1 + min_id = starting_id + (i * batch_size) + Searchkick::BulkReindexJob.perform_later( + class_name: scope.model_name.name, + min_id: min_id, + max_id: min_id + batch_size - 1, + index_name: name, + batch_id: batch_id + ) + Searchkick.redis.sadd(batches_key, batch_id) if Searchkick.redis + end + elsif scope.respond_to?(:find_in_batches) if resume # use total docs instead of max id since there's not a great way # to get the max _id without scripting since it's a string @@ -237,8 +265,10 @@ module Searchkick scope = scope.where("id > ?", total_docs) end + scope = scope.select("id").except(:includes, :preload) if async + scope.find_in_batches batch_size: batch_size do |batch| - import_or_update batch.select(&:should_index?), method_name + import_or_update batch, method_name, async end else # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb @@ -246,27 +276,18 @@ module Searchkick items = [] # TODO add resume scope.all.each do |item| - items << item if item.should_index? + items << item if items.length == batch_size - import_or_update items, method_name + import_or_update items, method_name, async items = [] end end - import_or_update items, method_name + import_or_update items, method_name, async end end - def import_or_update(records, method_name) - retries = 0 - begin - method_name ? bulk_update(records, method_name) : import(records) - rescue Faraday::ClientError => e - if retries < 1 - retries += 1 - retry - end - raise e - end + def batches_left + Searchkick.redis.scard(batches_key) if Searchkick.redis end # other @@ -377,5 +398,34 @@ module Searchkick obj end end + + def import_or_update(records, method_name, async) + if records.any? + if async + Searchkick::BulkReindexJob.perform_later( + class_name: records.first.class.name, + record_ids: records.map(&:id), + index_name: name, + method_name: method_name ? method_name.to_s : nil + ) + else + retries = 0 + records = records.select(&:should_index?) + begin + method_name ? bulk_update(records, method_name) : import(records) + rescue Faraday::ClientError => e + if retries < 1 + retries += 1 + retry + end + raise e + end + end + end + end + + def batches_key + "searchkick:reindex:#{name}:batches" + end end end diff --git a/lib/searchkick/model.rb b/lib/searchkick/model.rb index 055f841..a76a327 100644 --- a/lib/searchkick/model.rb +++ b/lib/searchkick/model.rb @@ -58,15 +58,16 @@ module Searchkick # update searchkick_index.import_scope(searchkick_klass, method_name: method_name) searchkick_index.refresh if refresh + true elsif scoped && !full # reindex association searchkick_index.import_scope(searchkick_klass) searchkick_index.refresh if refresh + true else # full reindex searchkick_index.reindex_scope(searchkick_klass, options) end - true end alias_method :reindex, :searchkick_reindex unless method_defined?(:reindex) diff --git a/test/reindex_test.rb b/test/reindex_test.rb index 827fec1..007ad7f 100644 --- a/test/reindex_test.rb +++ b/test/reindex_test.rb @@ -22,4 +22,21 @@ class ReindexTest < Minitest::Test store.products.reindex(refresh: true) assert_search "product", ["Product A", "Product B"] end + + def test_async + skip unless defined?(ActiveJob) && defined?(ActiveRecord) + + Searchkick.callbacks(false) do + store_names ["Product A"] + end + reindex = Product.reindex(async: true) + assert_search "product", [] + + index = Searchkick::Index.new(reindex[:index_name]) + index.refresh + assert_equal 1, index.total_docs + + Product.searchkick_index.promote(reindex[:index_name]) + assert_search "product", ["Product A"] + end end -- libgit2 0.21.0