Commit 3bd98e58d620c3f179a55fa99e058c5b42f3759e

Authored by Andrew Kane
1 parent efb694a1

Added async option for reindex

benchmark/Gemfile
... ... @@ -6,8 +6,13 @@ gemspec path: "../"
6 6 gem "sqlite3"
7 7 gem "activerecord", "~> 5.0.0"
8 8 gem "activerecord-import"
  9 +gem "activejob"
  10 +
  11 +# performance
9 12 gem "typhoeus"
10 13 gem "oj"
  14 +
  15 +# profiling
11 16 gem "ruby-prof"
12 17 gem "allocation_stats"
13 18 gem "get_process_mem"
... ...
benchmark/benchmark.rb
... ... @@ -2,12 +2,22 @@ require "bundler/setup"
2 2 Bundler.require(:default)
3 3 require "active_record"
4 4 require "benchmark"
  5 +require "active_support/notifications"
  6 +
  7 +# ActiveSupport::Notifications.subscribe "request.searchkick" do |*args|
  8 +# p args
  9 +# end
  10 +
  11 +# ActiveJob::Base.queue_adapter = :inline
5 12  
6 13 ActiveRecord::Base.default_timezone = :utc
7 14 ActiveRecord::Base.time_zone_aware_attributes = true
8   -ActiveRecord::Base.establish_connection adapter: "sqlite3", database: ":memory:"
  15 +ActiveRecord::Base.establish_connection adapter: "sqlite3", database: "/tmp/searchkick"
  16 +# ActiveRecord::Base.logger = Logger.new(STDOUT)
9 17  
10   -ActiveRecord::Migration.create_table :products do |t|
  18 +ActiveJob::Base.logger = nil
  19 +
  20 +ActiveRecord::Migration.create_table :products, force: :cascade do |t|
11 21 t.string :name
12 22 t.string :color
13 23 t.integer :store_id
... ... @@ -40,13 +50,16 @@ time =
40 50 # result = RubyProf.profile do
41 51 # report = MemoryProfiler.report do
42 52 # stats = AllocationStats.trace do
43   - Product.reindex
  53 + Product.reindex(async: true)
44 54 # end
45 55 end
46 56  
47 57 # p GetProcessMem.new.mb
48 58  
49 59 puts time.round(1)
  60 +
  61 +sleep(5)
  62 +Product.searchkick_index.refresh
50 63 puts Product.searchkick_index.total_docs
51 64  
52 65 if result
... ...
lib/searchkick.rb
... ... @@ -19,7 +19,10 @@ begin
19 19 rescue LoadError
20 20 # do nothing
21 21 end
22   -require "searchkick/reindex_v2_job" if defined?(ActiveJob)
  22 +if defined?(ActiveJob)
  23 + require "searchkick/bulk_reindex_job"
  24 + require "searchkick/reindex_v2_job"
  25 +end
23 26  
24 27 module Searchkick
25 28 class Error < StandardError; end
... ...
lib/searchkick/bulk_reindex_job.rb 0 โ†’ 100644
... ... @@ -0,0 +1,10 @@
  1 +module Searchkick
  2 + class BulkReindexJob < ActiveJob::Base
  3 + queue_as :searchkick
  4 +
  5 + def perform(klass, ids, method_name, index_name, index_options)
  6 + index = Searchkick::Index.new(index_name, index_options)
  7 + index.import_scope(klass.constantize.where(id: ids), method_name: method_name)
  8 + end
  9 + end
  10 +end
... ...
lib/searchkick/index.rb
... ... @@ -186,7 +186,7 @@ module Searchkick
186 186  
187 187 # https://gist.github.com/jarosan/3124884
188 188 # http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/
189   - def reindex_scope(scope, import: true, resume: false)
  189 + def reindex_scope(scope, import: true, resume: false, async: false)
190 190 if resume
191 191 index_name = all_indices.sort.last
192 192 raise Searchkick::Error, "No index to resume" unless index_name
... ... @@ -200,25 +200,27 @@ module Searchkick
200 200 # check if alias exists
201 201 if alias_exists?
202 202 # import before swap
203   - index.import_scope(scope, resume: resume) if import
  203 + index.import_scope(scope, resume: resume, async: async) if import
204 204  
205 205 # get existing indices to remove
206   - swap(index.name)
207   - clean_indices
  206 + # unless async
  207 + swap(index.name)
  208 + clean_indices
  209 + # end
208 210 else
209 211 delete if exists?
210 212 swap(index.name)
211 213  
212 214 # import after swap
213   - index.import_scope(scope, resume: resume) if import
  215 + index.import_scope(scope, resume: resume, async: async) if import
214 216 end
215 217  
216   - index.refresh
  218 + index.refresh unless async
217 219  
218 220 true
219 221 end
220 222  
221   - def import_scope(scope, resume: false, method_name: nil)
  223 + def import_scope(scope, resume: false, method_name: nil, async: false)
222 224 batch_size = @options[:batch_size] || 1000
223 225  
224 226 # use scope for import
... ... @@ -232,8 +234,10 @@ module Searchkick
232 234 scope = scope.where("id > ?", total_docs)
233 235 end
234 236  
  237 + scope = scope.select("id") if async
  238 +
235 239 scope.find_in_batches batch_size: batch_size do |batch|
236   - import_or_update batch.select(&:should_index?), method_name
  240 + import_or_update batch, method_name, async
237 241 end
238 242 else
239 243 # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb
... ... @@ -241,26 +245,33 @@ module Searchkick
241 245 items = []
242 246 # TODO add resume
243 247 scope.all.each do |item|
244   - items << item if item.should_index?
  248 + items << item
245 249 if items.length == batch_size
246   - import_or_update items, method_name
  250 + import_or_update items, method_name, async
247 251 items = []
248 252 end
249 253 end
250   - import_or_update items, method_name
  254 + import_or_update items, method_name, async
251 255 end
252 256 end
253 257  
254   - def import_or_update(records, method_name)
255   - retries = 0
256   - begin
257   - method_name ? bulk_update(records, method_name) : import(records)
258   - rescue Faraday::ClientError => e
259   - if retries < 1
260   - retries += 1
261   - retry
  258 + def import_or_update(records, method_name, async)
  259 + if records.any?
  260 + if async
  261 + Searchkick::BulkReindexJob.perform_later(records.first.class.name, records.map(&:id), method_name, name, @options)
  262 + else
  263 + retries = 0
  264 + records = records.select(&:should_index?)
  265 + begin
  266 + method_name ? bulk_update(records, method_name) : import(records)
  267 + rescue Faraday::ClientError => e
  268 + if retries < 1
  269 + retries += 1
  270 + retry
  271 + end
  272 + raise e
  273 + end
262 274 end
263   - raise e
264 275 end
265 276 end
266 277  
... ...