From 3bd98e58d620c3f179a55fa99e058c5b42f3759e Mon Sep 17 00:00:00 2001 From: Andrew Kane Date: Fri, 30 Dec 2016 19:22:25 -0800 Subject: [PATCH] Added async option for reindex --- benchmark/Gemfile | 5 +++++ benchmark/benchmark.rb | 19 ++++++++++++++++--- lib/searchkick.rb | 5 ++++- lib/searchkick/bulk_reindex_job.rb | 10 ++++++++++ lib/searchkick/index.rb | 51 +++++++++++++++++++++++++++++++-------------------- 5 files changed, 66 insertions(+), 24 deletions(-) create mode 100644 lib/searchkick/bulk_reindex_job.rb diff --git a/benchmark/Gemfile b/benchmark/Gemfile index 682086d..3bc6fbc 100644 --- a/benchmark/Gemfile +++ b/benchmark/Gemfile @@ -6,8 +6,13 @@ gemspec path: "../" gem "sqlite3" gem "activerecord", "~> 5.0.0" gem "activerecord-import" +gem "activejob" + +# performance gem "typhoeus" gem "oj" + +# profiling gem "ruby-prof" gem "allocation_stats" gem "get_process_mem" diff --git a/benchmark/benchmark.rb b/benchmark/benchmark.rb index 035deb5..b6a72a8 100644 --- a/benchmark/benchmark.rb +++ b/benchmark/benchmark.rb @@ -2,12 +2,22 @@ require "bundler/setup" Bundler.require(:default) require "active_record" require "benchmark" +require "active_support/notifications" + +# ActiveSupport::Notifications.subscribe "request.searchkick" do |*args| +# p args +# end + +# ActiveJob::Base.queue_adapter = :inline ActiveRecord::Base.default_timezone = :utc ActiveRecord::Base.time_zone_aware_attributes = true -ActiveRecord::Base.establish_connection adapter: "sqlite3", database: ":memory:" +ActiveRecord::Base.establish_connection adapter: "sqlite3", database: "/tmp/searchkick" +# ActiveRecord::Base.logger = Logger.new(STDOUT) -ActiveRecord::Migration.create_table :products do |t| +ActiveJob::Base.logger = nil + +ActiveRecord::Migration.create_table :products, force: :cascade do |t| t.string :name t.string :color t.integer :store_id @@ -40,13 +50,16 @@ time = # result = RubyProf.profile do # report = MemoryProfiler.report do # stats = AllocationStats.trace do - Product.reindex + Product.reindex(async: true) # end end # p GetProcessMem.new.mb puts time.round(1) + +sleep(5) +Product.searchkick_index.refresh puts Product.searchkick_index.total_docs if result diff --git a/lib/searchkick.rb b/lib/searchkick.rb index 46d4cf2..f5ff628 100644 --- a/lib/searchkick.rb +++ b/lib/searchkick.rb @@ -19,7 +19,10 @@ begin rescue LoadError # do nothing end -require "searchkick/reindex_v2_job" if defined?(ActiveJob) +if defined?(ActiveJob) + require "searchkick/bulk_reindex_job" + require "searchkick/reindex_v2_job" +end module Searchkick class Error < StandardError; end diff --git a/lib/searchkick/bulk_reindex_job.rb b/lib/searchkick/bulk_reindex_job.rb new file mode 100644 index 0000000..d747f1f --- /dev/null +++ b/lib/searchkick/bulk_reindex_job.rb @@ -0,0 +1,10 @@ +module Searchkick + class BulkReindexJob < ActiveJob::Base + queue_as :searchkick + + def perform(klass, ids, method_name, index_name, index_options) + index = Searchkick::Index.new(index_name, index_options) + index.import_scope(klass.constantize.where(id: ids), method_name: method_name) + end + end +end diff --git a/lib/searchkick/index.rb b/lib/searchkick/index.rb index 336832e..1cfb5de 100644 --- a/lib/searchkick/index.rb +++ b/lib/searchkick/index.rb @@ -186,7 +186,7 @@ module Searchkick # https://gist.github.com/jarosan/3124884 # http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/ - def reindex_scope(scope, import: true, resume: false) + def reindex_scope(scope, import: true, resume: false, async: false) if resume index_name = all_indices.sort.last raise Searchkick::Error, "No index to resume" unless index_name @@ -200,25 +200,27 @@ module Searchkick # check if alias exists if alias_exists? # import before swap - index.import_scope(scope, resume: resume) if import + index.import_scope(scope, resume: resume, async: async) if import # get existing indices to remove - swap(index.name) - clean_indices + # unless async + swap(index.name) + clean_indices + # end else delete if exists? swap(index.name) # import after swap - index.import_scope(scope, resume: resume) if import + index.import_scope(scope, resume: resume, async: async) if import end - index.refresh + index.refresh unless async true end - def import_scope(scope, resume: false, method_name: nil) + def import_scope(scope, resume: false, method_name: nil, async: false) batch_size = @options[:batch_size] || 1000 # use scope for import @@ -232,8 +234,10 @@ module Searchkick scope = scope.where("id > ?", total_docs) end + scope = scope.select("id") if async + scope.find_in_batches batch_size: batch_size do |batch| - import_or_update batch.select(&:should_index?), method_name + import_or_update batch, method_name, async end else # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb @@ -241,26 +245,33 @@ module Searchkick items = [] # TODO add resume scope.all.each do |item| - items << item if item.should_index? + items << item if items.length == batch_size - import_or_update items, method_name + import_or_update items, method_name, async items = [] end end - import_or_update items, method_name + import_or_update items, method_name, async end end - def import_or_update(records, method_name) - retries = 0 - begin - method_name ? bulk_update(records, method_name) : import(records) - rescue Faraday::ClientError => e - if retries < 1 - retries += 1 - retry + def import_or_update(records, method_name, async) + if records.any? + if async + Searchkick::BulkReindexJob.perform_later(records.first.class.name, records.map(&:id), method_name, name, @options) + else + retries = 0 + records = records.select(&:should_index?) + begin + method_name ? bulk_update(records, method_name) : import(records) + rescue Faraday::ClientError => e + if retries < 1 + retries += 1 + retry + end + raise e + end end - raise e end end -- libgit2 0.21.0