diff --git a/lib/searchkick.rb b/lib/searchkick.rb index 11fe58d..92f8239 100644 --- a/lib/searchkick.rb +++ b/lib/searchkick.rb @@ -205,17 +205,20 @@ module Searchkick end # message is private - def self.callbacks(value = nil, message: "Bulk") + def self.callbacks(value = nil, message: nil) if block_given? previous_value = callbacks_value begin self.callbacks_value = value result = yield - if callbacks_value == :bulk - event = { - name: message, - count: indexer.queued_items.size - } + if callbacks_value == :bulk && indexer.queued_items.any? + event = {} + if message + message.call(event) + else + event[:name] = "Bulk" + event[:count] = indexer.queued_items.size + end ActiveSupport::Notifications.instrument("request.searchkick", event) do indexer.perform end diff --git a/lib/searchkick/record_indexer.rb b/lib/searchkick/record_indexer.rb index a2266c7..fa82d9b 100644 --- a/lib/searchkick/record_indexer.rb +++ b/lib/searchkick/record_indexer.rb @@ -47,13 +47,13 @@ module Searchkick index.reindex_queue.push_records(records) when true, :inline index_records, other_records = records.partition { |r| index_record?(r) } - import_inline(index_records, !full ? other_records : [], method_name: method_name) + import_inline(index_records, !full ? other_records : [], method_name: method_name, single: single) else raise ArgumentError, "Invalid value for mode" end end - def reindex_items(klass, items, method_name:) + def reindex_items(klass, items, method_name:, single: false) routing = items.to_h { |r| [r[:id], r[:routing]] } record_ids = routing.keys @@ -68,7 +68,7 @@ module Searchkick construct_record(klass, id, routing[id]) end - import_inline(records, delete_records, method_name: method_name) + import_inline(records, delete_records, method_name: method_name, single: false) end private @@ -78,23 +78,51 @@ module Searchkick end # import in single request with retries - def import_inline(index_records, delete_records, method_name:) + def import_inline(index_records, delete_records, method_name:, single:) return if index_records.empty? && delete_records.empty? - action = method_name ? "Update" : "Import" - name = (index_records.first || delete_records.first).searchkick_klass.name - with_retries do - Searchkick.callbacks(:bulk, message: "#{name} #{action}") do - if index_records.any? - if method_name - index.bulk_update(index_records, method_name) - else - index.bulk_index(index_records) - end + maybe_bulk(index_records, delete_records, method_name, single) do + if index_records.any? + if method_name + index.bulk_update(index_records, method_name) + else + index.bulk_index(index_records) end + end + + if delete_records.any? + index.bulk_delete(delete_records) + end + end + end + + def maybe_bulk(index_records, delete_records, method_name, single) + if Searchkick.callbacks_value == :bulk + yield + else + # set action and data + action = + if single && index_records.empty? + "Remove" + elsif method_name + "Update" + else + single ? "Store" : "Import" + end + record = index_records.first || delete_records.first + name = record.searchkick_klass.name + message = lambda do |event| + event[:name] = "#{name} #{action}" + if single + event[:id] = index.search_id(record) + else + event[:count] = index_records.size + delete_records.size + end + end - if delete_records.any? - index.bulk_delete(delete_records) + with_retries do + Searchkick.callbacks(:bulk, message: message) do + yield end end end diff --git a/lib/searchkick/reindex_v2_job.rb b/lib/searchkick/reindex_v2_job.rb index affef69..8130e61 100644 --- a/lib/searchkick/reindex_v2_job.rb +++ b/lib/searchkick/reindex_v2_job.rb @@ -7,8 +7,7 @@ module Searchkick # may not be needed if calling search_import later model = model.unscoped if model.respond_to?(:unscoped) items = [{id: id, routing: routing}] - # TODO improve notification - model.searchkick_index.send(:record_indexer).reindex_items(model, items, method_name: method_name) + model.searchkick_index.send(:record_indexer).reindex_items(model, items, method_name: method_name, single: true) end end end -- libgit2 0.21.0