Commit f075cfd84109b585496d96a484a42052ecf25c8f
1 parent
910ff735
Exists in
master
and in
19 other branches
Cleaner code
Showing
1 changed file
with
44 additions
and
39 deletions
Show diff stats
lib/searchkick/index.rb
@@ -248,8 +248,6 @@ module Searchkick | @@ -248,8 +248,6 @@ module Searchkick | ||
248 | end | 248 | end |
249 | 249 | ||
250 | def import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false) | 250 | def import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false) |
251 | - batch_size = @options[:batch_size] || 1000 | ||
252 | - | ||
253 | # use scope for import | 251 | # use scope for import |
254 | scope = scope.search_import if scope.respond_to?(:search_import) | 252 | scope = scope.search_import if scope.respond_to?(:search_import) |
255 | 253 | ||
@@ -257,32 +255,7 @@ module Searchkick | @@ -257,32 +255,7 @@ module Searchkick | ||
257 | import_or_update scope.to_a, method_name, async | 255 | import_or_update scope.to_a, method_name, async |
258 | redis.srem(batches_key, batch_id) if batch_id && redis | 256 | redis.srem(batches_key, batch_id) if batch_id && redis |
259 | elsif full && async | 257 | elsif full && async |
260 | - if scope.respond_to?(:primary_key) | ||
261 | - # TODO expire Redis key | ||
262 | - primary_key = scope.primary_key | ||
263 | - starting_id = scope.minimum(primary_key) || 0 | ||
264 | - max_id = scope.maximum(primary_key) || 0 | ||
265 | - batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil | ||
266 | - | ||
267 | - batches_count.times do |i| | ||
268 | - batch_id = i + 1 | ||
269 | - min_id = starting_id + (i * batch_size) | ||
270 | - bulk_reindex_job scope, batch_id, min_id: min_id, max_id: min_id + batch_size - 1 | ||
271 | - end | ||
272 | - else | ||
273 | - batch_id = 1 | ||
274 | - items = [] | ||
275 | - scope = scope.only(:_id) if scope.respond_to?(:only) | ||
276 | - scope.each do |item| | ||
277 | - items << item | ||
278 | - if items.length == batch_size | ||
279 | - bulk_reindex_job scope, batch_id, record_ids: items.map { |i| i.id.to_s } | ||
280 | - items = [] | ||
281 | - batch_id += 1 | ||
282 | - end | ||
283 | - end | ||
284 | - bulk_reindex_job scope, batch_id, record_ids: items.map { |i| i.id.to_s } | ||
285 | - end | 258 | + full_reindex_async(scope) |
286 | elsif scope.respond_to?(:find_in_batches) | 259 | elsif scope.respond_to?(:find_in_batches) |
287 | if resume | 260 | if resume |
288 | # use total docs instead of max id since there's not a great way | 261 | # use total docs instead of max id since there's not a great way |
@@ -298,18 +271,9 @@ module Searchkick | @@ -298,18 +271,9 @@ module Searchkick | ||
298 | import_or_update batch, method_name, async | 271 | import_or_update batch, method_name, async |
299 | end | 272 | end |
300 | else | 273 | else |
301 | - # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb | ||
302 | - # use cursor for Mongoid | ||
303 | - items = [] | ||
304 | - # TODO add resume | ||
305 | - scope.all.each do |item| | ||
306 | - items << item | ||
307 | - if items.length == batch_size | ||
308 | - import_or_update items, method_name, async | ||
309 | - items = [] | ||
310 | - end | 274 | + each_batch(scope) do |items| |
275 | + import_or_update items, method_name, async | ||
311 | end | 276 | end |
312 | - import_or_update items, method_name, async | ||
313 | end | 277 | end |
314 | end | 278 | end |
315 | 279 | ||
@@ -446,6 +410,43 @@ module Searchkick | @@ -446,6 +410,43 @@ module Searchkick | ||
446 | end | 410 | end |
447 | end | 411 | end |
448 | 412 | ||
413 | + def full_reindex_async(scope) | ||
414 | + if scope.respond_to?(:primary_key) | ||
415 | + # TODO expire Redis key | ||
416 | + primary_key = scope.primary_key | ||
417 | + starting_id = scope.minimum(primary_key) || 0 | ||
418 | + max_id = scope.maximum(primary_key) || 0 | ||
419 | + batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil | ||
420 | + | ||
421 | + batches_count.times do |i| | ||
422 | + batch_id = i + 1 | ||
423 | + min_id = starting_id + (i * batch_size) | ||
424 | + bulk_reindex_job scope, batch_id, min_id: min_id, max_id: min_id + batch_size - 1 | ||
425 | + end | ||
426 | + else | ||
427 | + batch_id = 1 | ||
428 | + scope = scope.only(:_id) if scope.respond_to?(:only) | ||
429 | + each_batch(scope) do |items| | ||
430 | + bulk_reindex_job scope, batch_id, record_ids: items.map { |i| i.id.to_s } | ||
431 | + batch_id += 1 | ||
432 | + end | ||
433 | + end | ||
434 | + end | ||
435 | + | ||
436 | + def each_batch(scope) | ||
437 | + # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb | ||
438 | + # use cursor for Mongoid | ||
439 | + items = [] | ||
440 | + scope.all.each do |item| | ||
441 | + items << item | ||
442 | + if items.length == batch_size | ||
443 | + yield items | ||
444 | + items = [] | ||
445 | + end | ||
446 | + end | ||
447 | + yield items if items.any? | ||
448 | + end | ||
449 | + | ||
449 | def bulk_reindex_job(scope, batch_id, options) | 450 | def bulk_reindex_job(scope, batch_id, options) |
450 | Searchkick::BulkReindexJob.perform_later({ | 451 | Searchkick::BulkReindexJob.perform_later({ |
451 | class_name: scope.model_name.name, | 452 | class_name: scope.model_name.name, |
@@ -455,6 +456,10 @@ module Searchkick | @@ -455,6 +456,10 @@ module Searchkick | ||
455 | redis.sadd(batches_key, batch_id) if redis | 456 | redis.sadd(batches_key, batch_id) if redis |
456 | end | 457 | end |
457 | 458 | ||
459 | + def batch_size | ||
460 | + @batch_size ||= @options[:batch_size] || 1000 | ||
461 | + end | ||
462 | + | ||
458 | def with_retries | 463 | def with_retries |
459 | retries = 0 | 464 | retries = 0 |
460 | 465 |