Commit cadebb9f18e004dbd6f2aca46512fdf47b8d9b3a

Authored by Andrew Kane
1 parent 61ea667d

Push multiple records to reindex queue [skip ci]

lib/searchkick/bulk_indexer.rb
... ... @@ -11,6 +11,28 @@ module Searchkick
11 11 Searchkick.with_redis { |r| r.srem(batches_key, batch_id) } if batch_id
12 12 end
13 13  
  14 + def reindex_queue(records, method_name:)
  15 + if method_name
  16 + raise Searchkick::Error, "Partial reindex not supported with queue option"
  17 + end
  18 +
  19 + record_ids =
  20 + records.map do |record|
  21 + # always pass routing in case record is deleted
  22 + # before the queue job runs
  23 + if record.respond_to?(:search_routing)
  24 + routing = record.search_routing
  25 + end
  26 +
  27 + # escape pipe with double pipe
  28 + value = queue_escape(record.id.to_s)
  29 + value = "#{value}|#{queue_escape(routing)}" if routing
  30 + value
  31 + end
  32 +
  33 + index.reindex_queue.push(record_ids)
  34 + end
  35 +
14 36 def import_scope(relation, resume: false, method_name: nil, async: false, full: false, scope: nil, mode: nil)
15 37 if scope
16 38 relation = relation.send(scope)
... ... @@ -60,10 +82,7 @@ module Searchkick
60 82 method_name: method_name ? method_name.to_s : nil
61 83 )
62 84 when :queue
63   - # TODO push in single Redis call
64   - records.each do |record|
65   - RecordIndexer.new(record).reindex(method_name, mode: mode)
66   - end
  85 + reindex_queue(records, method_name: method_name)
67 86 when true, :inline
68 87 index_records, other_records = records.partition(&:should_index?)
69 88  
... ... @@ -177,5 +196,9 @@ module Searchkick
177 196 def batch_size
178 197 @batch_size ||= index.options[:batch_size] || 1000
179 198 end
  199 +
  200 + def queue_escape(value)
  201 + value.gsub("|", "||")
  202 + end
180 203 end
181 204 end
... ...
lib/searchkick/record_indexer.rb
... ... @@ -16,20 +16,7 @@ module Searchkick
16 16  
17 17 case mode
18 18 when :queue
19   - if method_name
20   - raise Searchkick::Error, "Partial reindex not supported with queue option"
21   - end
22   -
23   - # always pass routing in case record is deleted
24   - # before the queue job runs
25   - if record.respond_to?(:search_routing)
26   - routing = record.search_routing
27   - end
28   -
29   - # escape pipe with double pipe
30   - value = queue_escape(record.id.to_s)
31   - value = "#{value}|#{queue_escape(routing)}" if routing
32   - index.reindex_queue.push(value)
  19 + index.send(:bulk_indexer).reindex_queue([record], method_name: method_name)
33 20 when :async
34 21 unless defined?(ActiveJob)
35 22 raise Searchkick::Error, "Active Job not found"
... ... @@ -56,10 +43,6 @@ module Searchkick
56 43  
57 44 private
58 45  
59   - def queue_escape(value)
60   - value.gsub("|", "||")
61   - end
62   -
63 46 def reindex_record(method_name)
64 47 if record.destroyed? || !record.persisted? || !record.should_index?
65 48 begin
... ...
lib/searchkick/reindex_queue.rb
... ... @@ -8,6 +8,7 @@ module Searchkick
8 8 raise Searchkick::Error, "Searchkick.redis not set" unless Searchkick.redis
9 9 end
10 10  
  11 + # supports both single id and array
11 12 def push(record_id)
12 13 Searchkick.with_redis { |r| r.lpush(redis_key, record_id) }
13 14 end
... ...