Commit 9689ecadf2e75ab34ce0775f2b834aa3b8f49fb6

Authored by Andrew Kane
1 parent 3bd98e58

Reindex async threads

benchmark/Gemfile
@@ -6,11 +6,11 @@ gemspec path: "../" @@ -6,11 +6,11 @@ gemspec path: "../"
6 gem "sqlite3" 6 gem "sqlite3"
7 gem "activerecord", "~> 5.0.0" 7 gem "activerecord", "~> 5.0.0"
8 gem "activerecord-import" 8 gem "activerecord-import"
9 -gem "activejob"  
10 9
11 # performance 10 # performance
12 gem "typhoeus" 11 gem "typhoeus"
13 gem "oj" 12 gem "oj"
  13 +gem "thread"
14 14
15 # profiling 15 # profiling
16 gem "ruby-prof" 16 gem "ruby-prof"
benchmark/benchmark.rb
@@ -13,9 +13,9 @@ require "active_support/notifications" @@ -13,9 +13,9 @@ require "active_support/notifications"
13 ActiveRecord::Base.default_timezone = :utc 13 ActiveRecord::Base.default_timezone = :utc
14 ActiveRecord::Base.time_zone_aware_attributes = true 14 ActiveRecord::Base.time_zone_aware_attributes = true
15 ActiveRecord::Base.establish_connection adapter: "sqlite3", database: "/tmp/searchkick" 15 ActiveRecord::Base.establish_connection adapter: "sqlite3", database: "/tmp/searchkick"
16 -# ActiveRecord::Base.logger = Logger.new(STDOUT) 16 +ActiveRecord::Base.logger = Logger.new(STDOUT)
17 17
18 -ActiveJob::Base.logger = nil 18 +# ActiveJob::Base.logger = nil
19 19
20 ActiveRecord::Migration.create_table :products, force: :cascade do |t| 20 ActiveRecord::Migration.create_table :products, force: :cascade do |t|
21 t.string :name 21 t.string :name
@@ -35,7 +35,7 @@ class Product < ActiveRecord::Base @@ -35,7 +35,7 @@ class Product < ActiveRecord::Base
35 end 35 end
36 end 36 end
37 37
38 -Product.import ["name", "color", "store_id"], 20000.times.map { |i| ["Product #{i}", ["red", "blue"].sample, rand(10)] } 38 +Product.import ["name", "color", "store_id"], 100000.times.map { |i| ["Product #{i}", ["red", "blue"].sample, rand(10)] }
39 39
40 puts "Imported" 40 puts "Imported"
41 41
@@ -58,8 +58,8 @@ time = @@ -58,8 +58,8 @@ time =
58 58
59 puts time.round(1) 59 puts time.round(1)
60 60
61 -sleep(5)  
62 -Product.searchkick_index.refresh 61 +# sleep(5)
  62 +# Product.searchkick_index.refresh
63 puts Product.searchkick_index.total_docs 63 puts Product.searchkick_index.total_docs
64 64
65 if result 65 if result
lib/searchkick.rb
@@ -20,7 +20,6 @@ rescue LoadError @@ -20,7 +20,6 @@ rescue LoadError
20 # do nothing 20 # do nothing
21 end 21 end
22 if defined?(ActiveJob) 22 if defined?(ActiveJob)
23 - require "searchkick/bulk_reindex_job"  
24 require "searchkick/reindex_v2_job" 23 require "searchkick/reindex_v2_job"
25 end 24 end
26 25
lib/searchkick/bulk_reindex_job.rb
@@ -1,10 +0,0 @@ @@ -1,10 +0,0 @@
1 -module Searchkick  
2 - class BulkReindexJob < ActiveJob::Base  
3 - queue_as :searchkick  
4 -  
5 - def perform(klass, ids, method_name, index_name, index_options)  
6 - index = Searchkick::Index.new(index_name, index_options)  
7 - index.import_scope(klass.constantize.where(id: ids), method_name: method_name)  
8 - end  
9 - end  
10 -end  
lib/searchkick/index.rb
@@ -203,10 +203,8 @@ module Searchkick @@ -203,10 +203,8 @@ module Searchkick
203 index.import_scope(scope, resume: resume, async: async) if import 203 index.import_scope(scope, resume: resume, async: async) if import
204 204
205 # get existing indices to remove 205 # get existing indices to remove
206 - # unless async  
207 - swap(index.name)  
208 - clean_indices  
209 - # end 206 + swap(index.name)
  207 + clean_indices
210 else 208 else
211 delete if exists? 209 delete if exists?
212 swap(index.name) 210 swap(index.name)
@@ -215,7 +213,7 @@ module Searchkick @@ -215,7 +213,7 @@ module Searchkick
215 index.import_scope(scope, resume: resume, async: async) if import 213 index.import_scope(scope, resume: resume, async: async) if import
216 end 214 end
217 215
218 - index.refresh unless async 216 + index.refresh
219 217
220 true 218 true
221 end 219 end
@@ -223,6 +221,8 @@ module Searchkick @@ -223,6 +221,8 @@ module Searchkick
223 def import_scope(scope, resume: false, method_name: nil, async: false) 221 def import_scope(scope, resume: false, method_name: nil, async: false)
224 batch_size = @options[:batch_size] || 1000 222 batch_size = @options[:batch_size] || 1000
225 223
  224 + klass = scope
  225 +
226 # use scope for import 226 # use scope for import
227 scope = scope.search_import if scope.respond_to?(:search_import) 227 scope = scope.search_import if scope.respond_to?(:search_import)
228 if scope.respond_to?(:find_in_batches) 228 if scope.respond_to?(:find_in_batches)
@@ -236,8 +236,17 @@ module Searchkick @@ -236,8 +236,17 @@ module Searchkick
236 236
237 scope = scope.select("id") if async 237 scope = scope.select("id") if async
238 238
239 - scope.find_in_batches batch_size: batch_size do |batch|  
240 - import_or_update batch, method_name, async 239 + if async
  240 + require "thread/pool"
  241 + pool = Thread.pool(4)
  242 + end
  243 +
  244 + if async
  245 + scope.find_in_batches batch_size: batch_size do |batch|
  246 + import_or_update batch, method_name, klass, pool
  247 + end
  248 + else
  249 + import_or_update scope.to_a, method_name, klass, pool
241 end 250 end
242 else 251 else
243 # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb 252 # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb
@@ -247,18 +256,24 @@ module Searchkick @@ -247,18 +256,24 @@ module Searchkick
247 scope.all.each do |item| 256 scope.all.each do |item|
248 items << item 257 items << item
249 if items.length == batch_size 258 if items.length == batch_size
250 - import_or_update items, method_name, async 259 + import_or_update items, method_name, pool
251 items = [] 260 items = []
252 end 261 end
253 end 262 end
254 - import_or_update items, method_name, async 263 + import_or_update items, method_name, pool
255 end 264 end
  265 + ensure
  266 + pool.shutdown if pool
256 end 267 end
257 268
258 - def import_or_update(records, method_name, async) 269 + def import_or_update(records, method_name, klass, pool)
259 if records.any? 270 if records.any?
260 - if async  
261 - Searchkick::BulkReindexJob.perform_later(records.first.class.name, records.map(&:id), method_name, name, @options) 271 + if pool
  272 + pool.process do
  273 + klass.connection_pool.with_connection do
  274 + import_scope(klass.where(id: records.map(&:id)), method_name: method_name)
  275 + end
  276 + end
262 else 277 else
263 retries = 0 278 retries = 0
264 records = records.select(&:should_index?) 279 records = records.select(&:should_index?)