diff --git a/lib/searchkick/bulk_indexer.rb b/lib/searchkick/bulk_indexer.rb index 1df2488..add8586 100644 --- a/lib/searchkick/bulk_indexer.rb +++ b/lib/searchkick/bulk_indexer.rb @@ -11,28 +11,6 @@ module Searchkick Searchkick.with_redis { |r| r.srem(batches_key, batch_id) } if batch_id end - def reindex_queue(records, method_name:) - if method_name - raise Searchkick::Error, "Partial reindex not supported with queue option" - end - - record_ids = - records.map do |record| - # always pass routing in case record is deleted - # before the queue job runs - if record.respond_to?(:search_routing) - routing = record.search_routing - end - - # escape pipe with double pipe - value = queue_escape(record.id.to_s) - value = "#{value}|#{queue_escape(routing)}" if routing - value - end - - index.reindex_queue.push(record_ids) - end - def import_scope(relation, resume: false, method_name: nil, async: false, full: false, scope: nil, mode: nil) if scope relation = relation.send(scope) @@ -82,7 +60,11 @@ module Searchkick method_name: method_name ? method_name.to_s : nil ) when :queue - reindex_queue(records, method_name: method_name) + if method_name + raise Searchkick::Error, "Partial reindex not supported with queue option" + end + + index.reindex_queue.push_records(records) when true, :inline index_records, other_records = records.partition(&:should_index?) @@ -196,9 +178,5 @@ module Searchkick def batch_size @batch_size ||= index.options[:batch_size] || 1000 end - - def queue_escape(value) - value.gsub("|", "||") - end end end diff --git a/lib/searchkick/record_indexer.rb b/lib/searchkick/record_indexer.rb index 3b93335..38248c1 100644 --- a/lib/searchkick/record_indexer.rb +++ b/lib/searchkick/record_indexer.rb @@ -16,7 +16,11 @@ module Searchkick case mode when :queue - index.send(:bulk_indexer).reindex_queue([record], method_name: method_name) + if method_name + raise Searchkick::Error, "Partial reindex not supported with queue option" + end + + index.reindex_queue.push_records([record]) when :async unless defined?(ActiveJob) raise Searchkick::Error, "Active Job not found" diff --git a/lib/searchkick/reindex_queue.rb b/lib/searchkick/reindex_queue.rb index a9fa38b..91f45fd 100644 --- a/lib/searchkick/reindex_queue.rb +++ b/lib/searchkick/reindex_queue.rb @@ -8,9 +8,27 @@ module Searchkick raise Searchkick::Error, "Searchkick.redis not set" unless Searchkick.redis end - # supports both single id and array - def push(record_id) - Searchkick.with_redis { |r| r.lpush(redis_key, record_id) } + # supports single and multiple ids + def push(record_ids) + Searchkick.with_redis { |r| r.lpush(redis_key, record_ids) } + end + + def push_records(records) + record_ids = + records.map do |record| + # always pass routing in case record is deleted + # before the queue job runs + if record.respond_to?(:search_routing) + routing = record.search_routing + end + + # escape pipe with double pipe + value = escape(record.id.to_s) + value = "#{value}|#{escape(routing)}" if routing + value + end + + push(record_ids) end # TODO use reliable queuing @@ -49,5 +67,9 @@ module Searchkick def redis_version @redis_version ||= Searchkick.with_redis { |r| Gem::Version.new(r.info["redis_version"]) } end + + def escape(value) + value.gsub("|", "||") + end end end -- libgit2 0.21.0