diff --git a/benchmark/Gemfile b/benchmark/Gemfile index 3bc6fbc..b635e3c 100644 --- a/benchmark/Gemfile +++ b/benchmark/Gemfile @@ -6,11 +6,11 @@ gemspec path: "../" gem "sqlite3" gem "activerecord", "~> 5.0.0" gem "activerecord-import" -gem "activejob" # performance gem "typhoeus" gem "oj" +gem "thread" # profiling gem "ruby-prof" diff --git a/benchmark/benchmark.rb b/benchmark/benchmark.rb index b6a72a8..dc1c27d 100644 --- a/benchmark/benchmark.rb +++ b/benchmark/benchmark.rb @@ -13,9 +13,9 @@ require "active_support/notifications" ActiveRecord::Base.default_timezone = :utc ActiveRecord::Base.time_zone_aware_attributes = true ActiveRecord::Base.establish_connection adapter: "sqlite3", database: "/tmp/searchkick" -# ActiveRecord::Base.logger = Logger.new(STDOUT) +ActiveRecord::Base.logger = Logger.new(STDOUT) -ActiveJob::Base.logger = nil +# ActiveJob::Base.logger = nil ActiveRecord::Migration.create_table :products, force: :cascade do |t| t.string :name @@ -35,7 +35,7 @@ class Product < ActiveRecord::Base end end -Product.import ["name", "color", "store_id"], 20000.times.map { |i| ["Product #{i}", ["red", "blue"].sample, rand(10)] } +Product.import ["name", "color", "store_id"], 100000.times.map { |i| ["Product #{i}", ["red", "blue"].sample, rand(10)] } puts "Imported" @@ -58,8 +58,8 @@ time = puts time.round(1) -sleep(5) -Product.searchkick_index.refresh +# sleep(5) +# Product.searchkick_index.refresh puts Product.searchkick_index.total_docs if result diff --git a/lib/searchkick.rb b/lib/searchkick.rb index f5ff628..214401f 100644 --- a/lib/searchkick.rb +++ b/lib/searchkick.rb @@ -20,7 +20,6 @@ rescue LoadError # do nothing end if defined?(ActiveJob) - require "searchkick/bulk_reindex_job" require "searchkick/reindex_v2_job" end diff --git a/lib/searchkick/bulk_reindex_job.rb b/lib/searchkick/bulk_reindex_job.rb deleted file mode 100644 index d747f1f..0000000 --- a/lib/searchkick/bulk_reindex_job.rb +++ /dev/null @@ -1,10 +0,0 @@ -module Searchkick - class BulkReindexJob < ActiveJob::Base - queue_as :searchkick - - def perform(klass, ids, method_name, index_name, index_options) - index = Searchkick::Index.new(index_name, index_options) - index.import_scope(klass.constantize.where(id: ids), method_name: method_name) - end - end -end diff --git a/lib/searchkick/index.rb b/lib/searchkick/index.rb index 1cfb5de..4331a77 100644 --- a/lib/searchkick/index.rb +++ b/lib/searchkick/index.rb @@ -203,10 +203,8 @@ module Searchkick index.import_scope(scope, resume: resume, async: async) if import # get existing indices to remove - # unless async - swap(index.name) - clean_indices - # end + swap(index.name) + clean_indices else delete if exists? swap(index.name) @@ -215,7 +213,7 @@ module Searchkick index.import_scope(scope, resume: resume, async: async) if import end - index.refresh unless async + index.refresh true end @@ -223,6 +221,8 @@ module Searchkick def import_scope(scope, resume: false, method_name: nil, async: false) batch_size = @options[:batch_size] || 1000 + klass = scope + # use scope for import scope = scope.search_import if scope.respond_to?(:search_import) if scope.respond_to?(:find_in_batches) @@ -236,8 +236,17 @@ module Searchkick scope = scope.select("id") if async - scope.find_in_batches batch_size: batch_size do |batch| - import_or_update batch, method_name, async + if async + require "thread/pool" + pool = Thread.pool(4) + end + + if async + scope.find_in_batches batch_size: batch_size do |batch| + import_or_update batch, method_name, klass, pool + end + else + import_or_update scope.to_a, method_name, klass, pool end else # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb @@ -247,18 +256,24 @@ module Searchkick scope.all.each do |item| items << item if items.length == batch_size - import_or_update items, method_name, async + import_or_update items, method_name, pool items = [] end end - import_or_update items, method_name, async + import_or_update items, method_name, pool end + ensure + pool.shutdown if pool end - def import_or_update(records, method_name, async) + def import_or_update(records, method_name, klass, pool) if records.any? - if async - Searchkick::BulkReindexJob.perform_later(records.first.class.name, records.map(&:id), method_name, name, @options) + if pool + pool.process do + klass.connection_pool.with_connection do + import_scope(klass.where(id: records.map(&:id)), method_name: method_name) + end + end else retries = 0 records = records.select(&:should_index?) -- libgit2 0.21.0