Commit 178dca9d14e2b287ae9b2d0d49651f7eabaa0fae

Authored by Andrew Kane
1 parent cadebb9f

Added push_records to ReindexQueue [skip ci]

lib/searchkick/bulk_indexer.rb
... ... @@ -11,28 +11,6 @@ module Searchkick
11 11 Searchkick.with_redis { |r| r.srem(batches_key, batch_id) } if batch_id
12 12 end
13 13  
14   - def reindex_queue(records, method_name:)
15   - if method_name
16   - raise Searchkick::Error, "Partial reindex not supported with queue option"
17   - end
18   -
19   - record_ids =
20   - records.map do |record|
21   - # always pass routing in case record is deleted
22   - # before the queue job runs
23   - if record.respond_to?(:search_routing)
24   - routing = record.search_routing
25   - end
26   -
27   - # escape pipe with double pipe
28   - value = queue_escape(record.id.to_s)
29   - value = "#{value}|#{queue_escape(routing)}" if routing
30   - value
31   - end
32   -
33   - index.reindex_queue.push(record_ids)
34   - end
35   -
36 14 def import_scope(relation, resume: false, method_name: nil, async: false, full: false, scope: nil, mode: nil)
37 15 if scope
38 16 relation = relation.send(scope)
... ... @@ -82,7 +60,11 @@ module Searchkick
82 60 method_name: method_name ? method_name.to_s : nil
83 61 )
84 62 when :queue
85   - reindex_queue(records, method_name: method_name)
  63 + if method_name
  64 + raise Searchkick::Error, "Partial reindex not supported with queue option"
  65 + end
  66 +
  67 + index.reindex_queue.push_records(records)
86 68 when true, :inline
87 69 index_records, other_records = records.partition(&:should_index?)
88 70  
... ... @@ -196,9 +178,5 @@ module Searchkick
196 178 def batch_size
197 179 @batch_size ||= index.options[:batch_size] || 1000
198 180 end
199   -
200   - def queue_escape(value)
201   - value.gsub("|", "||")
202   - end
203 181 end
204 182 end
... ...
lib/searchkick/record_indexer.rb
... ... @@ -16,7 +16,11 @@ module Searchkick
16 16  
17 17 case mode
18 18 when :queue
19   - index.send(:bulk_indexer).reindex_queue([record], method_name: method_name)
  19 + if method_name
  20 + raise Searchkick::Error, "Partial reindex not supported with queue option"
  21 + end
  22 +
  23 + index.reindex_queue.push_records([record])
20 24 when :async
21 25 unless defined?(ActiveJob)
22 26 raise Searchkick::Error, "Active Job not found"
... ...
lib/searchkick/reindex_queue.rb
... ... @@ -8,9 +8,27 @@ module Searchkick
8 8 raise Searchkick::Error, "Searchkick.redis not set" unless Searchkick.redis
9 9 end
10 10  
11   - # supports both single id and array
12   - def push(record_id)
13   - Searchkick.with_redis { |r| r.lpush(redis_key, record_id) }
  11 + # supports single and multiple ids
  12 + def push(record_ids)
  13 + Searchkick.with_redis { |r| r.lpush(redis_key, record_ids) }
  14 + end
  15 +
  16 + def push_records(records)
  17 + record_ids =
  18 + records.map do |record|
  19 + # always pass routing in case record is deleted
  20 + # before the queue job runs
  21 + if record.respond_to?(:search_routing)
  22 + routing = record.search_routing
  23 + end
  24 +
  25 + # escape pipe with double pipe
  26 + value = escape(record.id.to_s)
  27 + value = "#{value}|#{escape(routing)}" if routing
  28 + value
  29 + end
  30 +
  31 + push(record_ids)
14 32 end
15 33  
16 34 # TODO use reliable queuing
... ... @@ -49,5 +67,9 @@ module Searchkick
49 67 def redis_version
50 68 @redis_version ||= Searchkick.with_redis { |r| Gem::Version.new(r.info["redis_version"]) }
51 69 end
  70 +
  71 + def escape(value)
  72 + value.gsub("|", "||")
  73 + end
52 74 end
53 75 end
... ...