diff --git a/lib/searchkick/model.rb b/lib/searchkick/model.rb index 8f29fe1..7c2cf01 100644 --- a/lib/searchkick/model.rb +++ b/lib/searchkick/model.rb @@ -76,7 +76,7 @@ module Searchkick end def reindex(method_name = nil, **options) - RecordIndexer.new(self).reindex(method_name, **options) + RecordIndexer.new(self.class.searchkick_index).reindex_records([self], method_name: method_name, **options) end unless method_defined?(:reindex) # TODO switch to keyword arguments diff --git a/lib/searchkick/record_indexer.rb b/lib/searchkick/record_indexer.rb index 7b5842e..38c2b83 100644 --- a/lib/searchkick/record_indexer.rb +++ b/lib/searchkick/record_indexer.rb @@ -1,17 +1,17 @@ module Searchkick class RecordIndexer - attr_reader :record, :index + attr_reader :index - def initialize(record) - @record = record - @index = record.class.searchkick_index + def initialize(index) + @index = index end - def reindex(method_name = nil, refresh: false, mode: nil) + def reindex_records(records, mode: nil, method_name: nil, refresh: false) unless [:inline, true, nil, :async, :queue].include?(mode) raise ArgumentError, "Invalid value for mode" end + # check afterwards for bulk mode ||= Searchkick.callbacks_value || index.options[:callbacks] || true case mode @@ -20,35 +20,59 @@ module Searchkick raise Searchkick::Error, "Partial reindex not supported with queue option" end - # always pass routing in case record is deleted - # before the queue job runs - if record.respond_to?(:search_routing) - routing = record.search_routing - end + record_ids = + records.map do |record| + # always pass routing in case record is deleted + # before the queue job runs + if record.respond_to?(:search_routing) + routing = record.search_routing + end + + # escape pipe with double pipe + value = queue_escape(record.id.to_s) + value = "#{value}|#{queue_escape(routing)}" if routing + value + end - # escape pipe with double pipe - value = queue_escape(record.id.to_s) - value = "#{value}|#{queue_escape(routing)}" if routing - index.reindex_queue.push(value) + index.reindex_queue.push(*record_ids) when :async unless defined?(ActiveJob) raise Searchkick::Error, "Active Job not found" end - # always pass routing in case record is deleted - # before the async job runs - if record.respond_to?(:search_routing) - routing = record.search_routing - end + # TODO use single job + records.each do |record| + # always pass routing in case record is deleted + # before the async job runs + if record.respond_to?(:search_routing) + routing = record.search_routing + end - Searchkick::ReindexV2Job.perform_later( - record.class.name, - record.id.to_s, - method_name ? method_name.to_s : nil, - routing: routing - ) + Searchkick::ReindexV2Job.perform_later( + record.class.name, + record.id.to_s, + method_name ? method_name.to_s : nil, + routing: routing + ) + end else # bulk, inline/true/nil - reindex_record(method_name) + delete_records, index_records = records.partition { |r| r.destroyed? || !r.persisted? || !r.should_index? } + + # TODO use + # index.bulk_delete(delete_records) + delete_records.each do |record| + begin + index.remove(record) + rescue Elasticsearch::Transport::Transport::Errors::NotFound + # do nothing + end + end + + if method_name + index.bulk_update(index_records, method_name) + else + index.bulk_index(index_records) + end index.refresh if refresh end @@ -59,21 +83,5 @@ module Searchkick def queue_escape(value) value.gsub("|", "||") end - - def reindex_record(method_name) - if record.destroyed? || !record.persisted? || !record.should_index? - begin - index.remove(record) - rescue Elasticsearch::Transport::Transport::Errors::NotFound - # do nothing - end - else - if method_name - index.update_record(record, method_name) - else - index.store(record) - end - end - end end end diff --git a/lib/searchkick/reindex_queue.rb b/lib/searchkick/reindex_queue.rb index b806b5b..4702001 100644 --- a/lib/searchkick/reindex_queue.rb +++ b/lib/searchkick/reindex_queue.rb @@ -8,8 +8,8 @@ module Searchkick raise Searchkick::Error, "Searchkick.redis not set" unless Searchkick.redis end - def push(record_id) - Searchkick.with_redis { |r| r.lpush(redis_key, record_id) } + def push(*record_ids) + Searchkick.with_redis { |r| r.lpush(redis_key, *record_ids) } end # TODO use reliable queuing diff --git a/lib/searchkick/reindex_v2_job.rb b/lib/searchkick/reindex_v2_job.rb index 03c8bb8..ab104d2 100644 --- a/lib/searchkick/reindex_v2_job.rb +++ b/lib/searchkick/reindex_v2_job.rb @@ -35,7 +35,7 @@ module Searchkick end end - RecordIndexer.new(record).reindex(method_name, mode: :inline) + RecordIndexer.new(model.searchkick_index).reindex_records([record], method_name: method_name, mode: :inline) end end end -- libgit2 0.21.0