Commit 929c4ce7ec8186bd92e6b0d1791e0b1dcb85506e

Authored by Andrew Kane
1 parent 8dc87dc6

Added async reindex

README.md
... ... @@ -1156,6 +1156,37 @@ Reindex and search with:
1156 1156 Business.search "ice cream", routing: params[:city_id]
1157 1157 ```
1158 1158  
  1159 +## Large Data Sets
  1160 +
  1161 +### Background Reindexing [experimental, ActiveRecord only]
  1162 +
  1163 +For large data sets, you can use background jobs to parallelize reindexing.
  1164 +
  1165 +```ruby
  1166 +Product.reindex(async: true)
  1167 +# {index_name: "products_production_20170111210018065"}
  1168 +```
  1169 +
  1170 +Once the jobs complete, promote the new index with:
  1171 +
  1172 +```ruby
  1173 +Product.searchkick_index.promote(index_name)
  1174 +```
  1175 +
  1176 +You can optionally track the status with Redis:
  1177 +
  1178 +```ruby
  1179 +Searchkick.redis = Redis.new
  1180 +```
  1181 +
  1182 +And use:
  1183 +
  1184 +```ruby
  1185 +Searchkick.reindex_status(index_name)
  1186 +```
  1187 +
  1188 +For more tips, check out [Keeping Elasticsearch in Sync](https://www.elastic.co/blog/found-keeping-elasticsearch-in-sync).
  1189 +
1159 1190 ## Advanced
1160 1191  
1161 1192 Prefer to use the [Elasticsearch DSL](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-queries.html) but still want awesome features like zero-downtime reindexing?
... ... @@ -1457,10 +1488,6 @@ Product.search "api", misspellings: {prefix_length: 2} # api, apt, no ahi
1457 1488 Product.search "ah", misspellings: {prefix_length: 2} # ah, no aha
1458 1489 ```
1459 1490  
1460   -## Large Data Sets
1461   -
1462   -For large data sets, check out [Keeping Elasticsearch in Sync](https://www.elastic.co/blog/found-keeping-elasticsearch-in-sync). Searchkick will make this easy in the future.
1463   -
1464 1491 ## Testing
1465 1492  
1466 1493 This section could use some love.
... ...
lib/searchkick.rb
... ... @@ -19,7 +19,10 @@ begin
19 19 rescue LoadError
20 20 # do nothing
21 21 end
22   -require "searchkick/reindex_v2_job" if defined?(ActiveJob)
  22 +if defined?(ActiveJob)
  23 + require "searchkick/bulk_reindex_job"
  24 + require "searchkick/reindex_v2_job"
  25 +end
23 26  
24 27 module Searchkick
25 28 class Error < StandardError; end
... ... @@ -30,7 +33,7 @@ module Searchkick
30 33 class ImportError < Error; end
31 34  
32 35 class << self
33   - attr_accessor :search_method_name, :wordnet_path, :timeout, :models, :client_options
  36 + attr_accessor :search_method_name, :wordnet_path, :timeout, :models, :client_options, :redis
34 37 attr_writer :client, :env, :search_timeout
35 38 attr_reader :aws_credentials
36 39 end
... ... @@ -129,6 +132,16 @@ module Searchkick
129 132 @client = nil # reset client
130 133 end
131 134  
  135 + def self.reindex_status(index_name)
  136 + if redis
  137 + batches_left = Searchkick::Index.new(index_name).batches_left
  138 + {
  139 + completed: batches_left == 0,
  140 + batches_left: batches_left
  141 + }
  142 + end
  143 + end
  144 +
132 145 # private
133 146 def self.indexer
134 147 Thread.current[:searchkick_indexer] ||= Searchkick::Indexer.new
... ...
lib/searchkick/bulk_reindex_job.rb 0 โ†’ 100644
... ... @@ -0,0 +1,12 @@
  1 +module Searchkick
  2 + class BulkReindexJob < ActiveJob::Base
  3 + queue_as :searchkick
  4 +
  5 + def perform(class_name:, record_ids: nil, index_name: nil, method_name: nil, batch_id: nil, min_id: nil, max_id: nil)
  6 + klass = class_name.constantize
  7 + index = index_name ? Searchkick::Index.new(index_name) : klass.searchkick_index
  8 + record_ids ||= min_id..max_id
  9 + index.import_scope(klass.where(klass.primary_key => record_ids), method_name: method_name, batch: true, batch_id: batch_id)
  10 + end
  11 + end
  12 +end
... ...
lib/searchkick/index.rb
... ... @@ -191,7 +191,7 @@ module Searchkick
191 191  
192 192 # https://gist.github.com/jarosan/3124884
193 193 # http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/
194   - def reindex_scope(scope, import: true, resume: false, retain: false)
  194 + def reindex_scope(scope, import: true, resume: false, retain: false, async: false)
195 195 if resume
196 196 index_name = all_indices.sort.last
197 197 raise Searchkick::Error, "No index to resume" unless index_name
... ... @@ -205,30 +205,58 @@ module Searchkick
205 205 # check if alias exists
206 206 if alias_exists?
207 207 # import before promotion
208   - index.import_scope(scope, resume: resume) if import
  208 + index.import_scope(scope, resume: resume, async: async, full: true) if import
209 209  
210 210 # get existing indices to remove
211   - promote(index.name)
212   - clean_indices unless retain
  211 + unless async
  212 + promote(index.name)
  213 + clean_indices unless retain
  214 + end
213 215 else
214 216 delete if exists?
215 217 promote(index.name)
216 218  
217 219 # import after promotion
218   - index.import_scope(scope, resume: resume) if import
  220 + index.import_scope(scope, resume: resume, async: async, full: true) if import
219 221 end
220 222  
221   - index.refresh
222   -
223   - true
  223 + if async
  224 + {index_name: index.name}
  225 + else
  226 + index.refresh
  227 + true
  228 + end
224 229 end
225 230  
226   - def import_scope(scope, resume: false, method_name: nil)
  231 + def import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false)
227 232 batch_size = @options[:batch_size] || 1000
228 233  
229 234 # use scope for import
230 235 scope = scope.search_import if scope.respond_to?(:search_import)
231   - if scope.respond_to?(:find_in_batches)
  236 +
  237 + if batch
  238 + import_or_update scope.to_a, method_name, async
  239 + Searchkick.redis.srem(batches_key, batch_id) if batch_id && Searchkick.redis
  240 + elsif full && async
  241 + # TODO expire Redis key
  242 + primary_key = scope.primary_key
  243 + starting_id = scope.minimum(primary_key)
  244 + max_id = scope.maximum(primary_key)
  245 + batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil
  246 +
  247 + batches_count.times do |i|
  248 + batch_id = i + 1
  249 + min_id = starting_id + (i * batch_size)
  250 + Searchkick::BulkReindexJob.perform_later(
  251 + class_name: scope.model_name.name,
  252 + min_id: min_id,
  253 + max_id: min_id + batch_size - 1,
  254 + index_name: name,
  255 + batch_id: batch_id
  256 + )
  257 + Searchkick.redis.sadd(batches_key, batch_id) if Searchkick.redis
  258 + end
  259 + elsif scope.respond_to?(:find_in_batches)
232 260 if resume
233 261 # use total docs instead of max id since there's not a great way
234 262 # to get the max _id without scripting since it's a string
... ... @@ -237,8 +265,10 @@ module Searchkick
237 265 scope = scope.where("id > ?", total_docs)
238 266 end
239 267  
  268 + scope = scope.select("id").except(:includes, :preload) if async
  269 +
240 270 scope.find_in_batches batch_size: batch_size do |batch|
241   - import_or_update batch.select(&:should_index?), method_name
  271 + import_or_update batch, method_name, async
242 272 end
243 273 else
244 274 # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb
... ... @@ -246,27 +276,18 @@ module Searchkick
246 276 items = []
247 277 # TODO add resume
248 278 scope.all.each do |item|
249   - items << item if item.should_index?
  279 + items << item
250 280 if items.length == batch_size
251   - import_or_update items, method_name
  281 + import_or_update items, method_name, async
252 282 items = []
253 283 end
254 284 end
255   - import_or_update items, method_name
  285 + import_or_update items, method_name, async
256 286 end
257 287 end
258 288  
259   - def import_or_update(records, method_name)
260   - retries = 0
261   - begin
262   - method_name ? bulk_update(records, method_name) : import(records)
263   - rescue Faraday::ClientError => e
264   - if retries < 1
265   - retries += 1
266   - retry
267   - end
268   - raise e
269   - end
  289 + def batches_left
  290 + Searchkick.redis.scard(batches_key) if Searchkick.redis
270 291 end
271 292  
272 293 # other
... ... @@ -377,5 +398,34 @@ module Searchkick
377 398 obj
378 399 end
379 400 end
  401 +
  402 + def import_or_update(records, method_name, async)
  403 + if records.any?
  404 + if async
  405 + Searchkick::BulkReindexJob.perform_later(
  406 + class_name: records.first.class.name,
  407 + record_ids: records.map(&:id),
  408 + index_name: name,
  409 + method_name: method_name ? method_name.to_s : nil
  410 + )
  411 + else
  412 + retries = 0
  413 + records = records.select(&:should_index?)
  414 + begin
  415 + method_name ? bulk_update(records, method_name) : import(records)
  416 + rescue Faraday::ClientError => e
  417 + if retries < 1
  418 + retries += 1
  419 + retry
  420 + end
  421 + raise e
  422 + end
  423 + end
  424 + end
  425 + end
  426 +
  427 + def batches_key
  428 + "searchkick:reindex:#{name}:batches"
  429 + end
380 430 end
381 431 end
... ...
lib/searchkick/model.rb
... ... @@ -58,15 +58,16 @@ module Searchkick
58 58 # update
59 59 searchkick_index.import_scope(searchkick_klass, method_name: method_name)
60 60 searchkick_index.refresh if refresh
  61 + true
61 62 elsif scoped && !full
62 63 # reindex association
63 64 searchkick_index.import_scope(searchkick_klass)
64 65 searchkick_index.refresh if refresh
  66 + true
65 67 else
66 68 # full reindex
67 69 searchkick_index.reindex_scope(searchkick_klass, options)
68 70 end
69   - true
70 71 end
71 72 alias_method :reindex, :searchkick_reindex unless method_defined?(:reindex)
72 73  
... ...
test/reindex_test.rb
... ... @@ -22,4 +22,21 @@ class ReindexTest &lt; Minitest::Test
22 22 store.products.reindex(refresh: true)
23 23 assert_search "product", ["Product A", "Product B"]
24 24 end
  25 +
  26 + def test_async
  27 + skip unless defined?(ActiveJob) && defined?(ActiveRecord)
  28 +
  29 + Searchkick.callbacks(false) do
  30 + store_names ["Product A"]
  31 + end
  32 + reindex = Product.reindex(async: true)
  33 + assert_search "product", []
  34 +
  35 + index = Searchkick::Index.new(reindex[:index_name])
  36 + index.refresh
  37 + assert_equal 1, index.total_docs
  38 +
  39 + Product.searchkick_index.promote(reindex[:index_name])
  40 + assert_search "product", ["Product A"]
  41 + end
25 42 end
... ...