Commit ac938b363e613acae46b3424b2bb040bc33c6abf

Authored by Andrew Kane
1 parent d92c36f5

Seperate ProcessBatchJob again for cleaner code

lib/searchkick/bulk_reindex_job.rb
... ... @@ -2,7 +2,7 @@ module Searchkick
2 2 class BulkReindexJob < ActiveJob::Base
3 3 queue_as :searchkick
4 4  
5   - def perform(class_name:, record_ids: nil, index_name: nil, method_name: nil, batch_id: nil, min_id: nil, max_id: nil, delete_missing: false)
  5 + def perform(class_name:, record_ids: nil, index_name: nil, method_name: nil, batch_id: nil, min_id: nil, max_id: nil)
6 6 klass = class_name.constantize
7 7 index = index_name ? Searchkick::Index.new(index_name) : klass.searchkick_index
8 8 record_ids ||= min_id..max_id
... ... @@ -10,9 +10,7 @@ module Searchkick
10 10 Searchkick.load_records(klass, record_ids),
11 11 method_name: method_name,
12 12 batch: true,
13   - batch_id: batch_id,
14   - delete_missing: delete_missing,
15   - record_ids: record_ids
  13 + batch_id: batch_id
16 14 )
17 15 end
18 16 end
... ...
lib/searchkick/index.rb
... ... @@ -247,14 +247,14 @@ module Searchkick
247 247 end
248 248 end
249 249  
250   - def import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false, delete_missing: false, record_ids: nil)
  250 + def import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false)
251 251 batch_size = @options[:batch_size] || 1000
252 252  
253 253 # use scope for import
254 254 scope = scope.search_import if scope.respond_to?(:search_import)
255 255  
256 256 if batch
257   - import_or_update scope.to_a, method_name, async, delete_missing, record_ids, scope.model_name.name.constantize
  257 + import_or_update scope.to_a, method_name, async
258 258 redis.srem(batches_key, batch_id) if batch_id && redis
259 259 elsif full && async
260 260 if scope.respond_to?(:primary_key)
... ... @@ -422,7 +422,7 @@ module Searchkick
422 422 end
423 423 end
424 424  
425   - def import_or_update(records, method_name, async, delete_missing = false, record_ids = nil, klass = nil)
  425 + def import_or_update(records, method_name, async)
426 426 if records.any?
427 427 if async
428 428 Searchkick::BulkReindexJob.perform_later(
... ... @@ -433,22 +433,9 @@ module Searchkick
433 433 )
434 434 else
435 435 records = records.select(&:should_index?)
436   -
437   - delete_records =
438   - if delete_missing
439   - # determine which records to delete
440   - (record_ids - records.map { |r| r.id.to_s }).map { |id| m = klass.new; m.id = id; m }
441   - else
442   - []
443   - end
444   -
445   - with_retries do
446   - # bulk reindex
447   - possibly_bulk(delete_records.any?) do
448   - if records.any?
449   - method_name ? bulk_update(records, method_name) : import(records)
450   - end
451   - bulk_delete(delete_records) if delete_records.any?
  436 + if records.any?
  437 + with_retries do
  438 + method_name ? bulk_update(records, method_name) : import(records)
452 439 end
453 440 end
454 441 end
... ... @@ -473,18 +460,6 @@ module Searchkick
473 460 Searchkick.redis
474 461 end
475 462  
476   - # use bulk if no callbacks value set and deleted records
477   - # if no deleted records, we can show friendlier notifications
478   - def possibly_bulk(deleted_records)
479   - if Searchkick.callbacks_value || !deleted_records
480   - yield
481   - else
482   - Searchkick.callbacks(:bulk) do
483   - yield
484   - end
485   - end
486   - end
487   -
488 463 def batches_key
489 464 "searchkick:reindex:#{name}:batches"
490 465 end
... ...
lib/searchkick/process_batch_job.rb
... ... @@ -3,12 +3,21 @@ module Searchkick
3 3 queue_as :searchkick
4 4  
5 5 def perform(class_name:, record_ids:)
6   - # job deprecated
7   - Searchkick::BulkReindexJob.perform_now(
8   - class_name: class_name,
9   - record_ids: record_ids,
10   - delete_missing: true
11   - )
  6 + klass = class_name.constantize
  7 + scope = Searchkick.load_records(klass, record_ids)
  8 + scope = scope.search_import if scope.respond_to?(:search_import)
  9 + records = scope.select(&:should_index?)
  10 +
  11 + # determine which records to delete
  12 + delete_ids = record_ids - records.map { |r| r.id.to_s }
  13 + delete_records = delete_ids.map { |id| m = klass.new; m.id = id; m }
  14 +
  15 + # bulk reindex
  16 + index = klass.searchkick_index
  17 + Searchkick.callbacks(:bulk) do
  18 + index.bulk_index(records)
  19 + index.bulk_delete(delete_records)
  20 + end
12 21 end
13 22 end
14 23 end
... ...
lib/searchkick/process_queue_job.rb
... ... @@ -8,10 +8,9 @@ module Searchkick
8 8 limit = model.searchkick_index.options[:batch_size] || 1000
9 9 record_ids = Searchkick::ReindexQueue.new(model.searchkick_index.name).reserve(limit: limit)
10 10 if record_ids.any?
11   - Searchkick::BulkReindexJob.perform_later(
  11 + Searchkick::ProcessBatchJob.perform_later(
12 12 class_name: model.name,
13   - record_ids: record_ids,
14   - delete_missing: true
  13 + record_ids: record_ids
15 14 )
16 15 # TODO when moving to reliable queuing, mark as complete
17 16  
... ...