diff --git a/lib/searchkick.rb b/lib/searchkick.rb index 9fcf574..11fe58d 100644 --- a/lib/searchkick.rb +++ b/lib/searchkick.rb @@ -10,7 +10,6 @@ require "hashie" require "forwardable" # modules -require "searchkick/bulk_record_indexer" require "searchkick/controller_runtime" require "searchkick/index" require "searchkick/index_options" diff --git a/lib/searchkick/bulk_record_indexer.rb b/lib/searchkick/bulk_record_indexer.rb deleted file mode 100644 index b4472b5..0000000 --- a/lib/searchkick/bulk_record_indexer.rb +++ /dev/null @@ -1,128 +0,0 @@ -module Searchkick - class BulkRecordIndexer - attr_reader :index - - def initialize(index) - @index = index - end - - def reindex(records, mode:, method_name:, full: false, single: false) - return if records.empty? - - case mode - when :async - unless defined?(ActiveJob) - raise Searchkick::Error, "Active Job not found" - end - - # temporary hack - if single - record = records.first - - # always pass routing in case record is deleted - # before the async job runs - if record.respond_to?(:search_routing) - routing = record.search_routing - end - - Searchkick::ReindexV2Job.perform_later( - record.class.name, - record.id.to_s, - method_name ? method_name.to_s : nil, - routing: routing - ) - else - Searchkick::BulkReindexJob.perform_later( - class_name: records.first.class.searchkick_options[:class_name], - record_ids: records.map(&:id), - index_name: index.name, - method_name: method_name ? method_name.to_s : nil - ) - end - when :queue - if method_name - raise Searchkick::Error, "Partial reindex not supported with queue option" - end - - index.reindex_queue.push_records(records) - when true, :inline - index_records, other_records = records.partition { |r| index_record?(r) } - import_inline(index_records, !full ? other_records : [], method_name: method_name) - else - raise ArgumentError, "Invalid value for mode" - end - end - - def reindex_items(klass, items, method_name:) - routing = items.to_h { |r| [r[:id], r[:routing]] } - record_ids = routing.keys - - scope = Searchkick.load_records(klass, record_ids) - # scope = scope.search_import if scope.respond_to?(:search_import) - records = scope.select(&:should_index?) - - # determine which records to delete - delete_ids = record_ids - records.map { |r| r.id.to_s } - delete_records = - delete_ids.map do |id| - construct_record(klass, id, routing[id]) - end - - import_inline(records, delete_records, method_name: method_name) - end - - private - - def index_record?(record) - record.persisted? && !record.destroyed? && record.should_index? - end - - # import in single request with retries - def import_inline(index_records, delete_records, method_name:) - return if index_records.empty? && delete_records.empty? - - action = method_name ? "Update" : "Import" - name = (index_records.first || delete_records.first).searchkick_klass.name - with_retries do - Searchkick.callbacks(:bulk, message: "#{name} #{action}") do - if index_records.any? - if method_name - index.bulk_update(index_records, method_name) - else - index.bulk_index(index_records) - end - end - - if delete_records.any? - index.bulk_delete(delete_records) - end - end - end - end - - def construct_record(klass, id, routing) - record = klass.new - record.id = id - if routing - record.define_singleton_method(:search_routing) do - routing - end - end - record - end - - def with_retries - retries = 0 - - begin - yield - rescue Faraday::ClientError => e - if retries < 1 - retries += 1 - retry - end - raise e - end - end - end -end diff --git a/lib/searchkick/bulk_reindex_job.rb b/lib/searchkick/bulk_reindex_job.rb index 3cd0a0d..0cef572 100644 --- a/lib/searchkick/bulk_reindex_job.rb +++ b/lib/searchkick/bulk_reindex_job.rb @@ -9,7 +9,7 @@ module Searchkick records = Searchkick.load_records(klass, record_ids) # TODO expose functionality on index - index.send(:bulk_record_indexer).reindex(records, mode: :inline, method_name: method_name, full: false) + index.send(:record_indexer).reindex(records, mode: :inline, method_name: method_name, full: false) index.send(:relation_indexer).batch_completed(batch_id) if batch_id end end diff --git a/lib/searchkick/index.rb b/lib/searchkick/index.rb index edc4939..445458d 100644 --- a/lib/searchkick/index.rb +++ b/lib/searchkick/index.rb @@ -302,8 +302,8 @@ module Searchkick @relation_indexer ||= RelationIndexer.new(self) end - def bulk_record_indexer - @bulk_record_indexer ||= BulkRecordIndexer.new(self) + def record_indexer + @record_indexer ||= RecordIndexer.new(self) end def index_settings diff --git a/lib/searchkick/model.rb b/lib/searchkick/model.rb index 4d93fd2..b7b04c0 100644 --- a/lib/searchkick/model.rb +++ b/lib/searchkick/model.rb @@ -29,7 +29,7 @@ module Searchkick def reindex(method_name = nil, mode: nil, refresh: false) mode ||= Searchkick.callbacks_value || self.class.searchkick_index.options[:callbacks] || true mode = :inline if mode == :bulk - result = self.class.searchkick_index.send(:bulk_record_indexer).reindex([self], mode: mode, method_name: method_name, single: true) + result = self.class.searchkick_index.send(:record_indexer).reindex([self], mode: mode, method_name: method_name, single: true) self.class.searchkick_index.refresh if refresh result end diff --git a/lib/searchkick/process_batch_job.rb b/lib/searchkick/process_batch_job.rb index 84c0db5..42c13b3 100644 --- a/lib/searchkick/process_batch_job.rb +++ b/lib/searchkick/process_batch_job.rb @@ -15,7 +15,7 @@ module Searchkick relation = klass relation = relation.search_import if relation.respond_to?(:search_import) - index.send(:bulk_record_indexer).reindex_items(relation, items, method_name: nil) + index.send(:record_indexer).reindex_items(relation, items, method_name: nil) end end end diff --git a/lib/searchkick/record_indexer.rb b/lib/searchkick/record_indexer.rb index 38248c1..a2266c7 100644 --- a/lib/searchkick/record_indexer.rb +++ b/lib/searchkick/record_indexer.rb @@ -1,66 +1,127 @@ module Searchkick class RecordIndexer - attr_reader :record, :index + attr_reader :index - def initialize(record) - @record = record - @index = record.class.searchkick_index + def initialize(index) + @index = index end - def reindex(method_name = nil, refresh: false, mode: nil) - unless [:inline, true, nil, :async, :queue].include?(mode) - raise ArgumentError, "Invalid value for mode" - end - - mode ||= Searchkick.callbacks_value || index.options[:callbacks] || true + def reindex(records, mode:, method_name:, full: false, single: false) + return if records.empty? case mode - when :queue - if method_name - raise Searchkick::Error, "Partial reindex not supported with queue option" - end - - index.reindex_queue.push_records([record]) when :async unless defined?(ActiveJob) raise Searchkick::Error, "Active Job not found" end - # always pass routing in case record is deleted - # before the async job runs - if record.respond_to?(:search_routing) - routing = record.search_routing - end + # temporary hack + if single + record = records.first + + # always pass routing in case record is deleted + # before the async job runs + if record.respond_to?(:search_routing) + routing = record.search_routing + end - Searchkick::ReindexV2Job.perform_later( - record.class.name, - record.id.to_s, - method_name ? method_name.to_s : nil, - routing: routing - ) - else # bulk, inline/true/nil - reindex_record(method_name) + Searchkick::ReindexV2Job.perform_later( + record.class.name, + record.id.to_s, + method_name ? method_name.to_s : nil, + routing: routing + ) + else + Searchkick::BulkReindexJob.perform_later( + class_name: records.first.class.searchkick_options[:class_name], + record_ids: records.map(&:id), + index_name: index.name, + method_name: method_name ? method_name.to_s : nil + ) + end + when :queue + if method_name + raise Searchkick::Error, "Partial reindex not supported with queue option" + end - index.refresh if refresh + index.reindex_queue.push_records(records) + when true, :inline + index_records, other_records = records.partition { |r| index_record?(r) } + import_inline(index_records, !full ? other_records : [], method_name: method_name) + else + raise ArgumentError, "Invalid value for mode" end end + def reindex_items(klass, items, method_name:) + routing = items.to_h { |r| [r[:id], r[:routing]] } + record_ids = routing.keys + + scope = Searchkick.load_records(klass, record_ids) + # scope = scope.search_import if scope.respond_to?(:search_import) + records = scope.select(&:should_index?) + + # determine which records to delete + delete_ids = record_ids - records.map { |r| r.id.to_s } + delete_records = + delete_ids.map do |id| + construct_record(klass, id, routing[id]) + end + + import_inline(records, delete_records, method_name: method_name) + end + private - def reindex_record(method_name) - if record.destroyed? || !record.persisted? || !record.should_index? - begin - index.remove(record) - rescue => e - raise e unless Searchkick.not_found_error?(e) - # do nothing if not found + def index_record?(record) + record.persisted? && !record.destroyed? && record.should_index? + end + + # import in single request with retries + def import_inline(index_records, delete_records, method_name:) + return if index_records.empty? && delete_records.empty? + + action = method_name ? "Update" : "Import" + name = (index_records.first || delete_records.first).searchkick_klass.name + with_retries do + Searchkick.callbacks(:bulk, message: "#{name} #{action}") do + if index_records.any? + if method_name + index.bulk_update(index_records, method_name) + else + index.bulk_index(index_records) + end + end + + if delete_records.any? + index.bulk_delete(delete_records) + end end - else - if method_name - index.update_record(record, method_name) - else - index.store(record) + end + end + + def construct_record(klass, id, routing) + record = klass.new + record.id = id + if routing + record.define_singleton_method(:search_routing) do + routing + end + end + record + end + + def with_retries + retries = 0 + + begin + yield + rescue Faraday::ClientError => e + if retries < 1 + retries += 1 + retry end + raise e end end end diff --git a/lib/searchkick/reindex_v2_job.rb b/lib/searchkick/reindex_v2_job.rb index 5714160..affef69 100644 --- a/lib/searchkick/reindex_v2_job.rb +++ b/lib/searchkick/reindex_v2_job.rb @@ -8,7 +8,7 @@ module Searchkick model = model.unscoped if model.respond_to?(:unscoped) items = [{id: id, routing: routing}] # TODO improve notification - model.searchkick_index.send(:bulk_record_indexer).reindex_items(model, items, method_name: method_name) + model.searchkick_index.send(:record_indexer).reindex_items(model, items, method_name: method_name) end end end diff --git a/lib/searchkick/relation_indexer.rb b/lib/searchkick/relation_indexer.rb index bad16eb..f9f5bfb 100644 --- a/lib/searchkick/relation_indexer.rb +++ b/lib/searchkick/relation_indexer.rb @@ -49,7 +49,7 @@ module Searchkick private def import_or_update(records, method_name, mode, full) - bulk_record_indexer.reindex( + record_indexer.reindex( records, mode: mode, method_name: method_name, @@ -131,8 +131,8 @@ module Searchkick @batch_size ||= index.options[:batch_size] || 1000 end - def bulk_record_indexer - @bulk_record_indexer ||= index.send(:bulk_record_indexer) + def record_indexer + @record_indexer ||= index.send(:record_indexer) end end end -- libgit2 0.21.0