Commit 619b2a3ba2a9c7da739e56da2cb8a57010f436c5

Authored by Andrew Kane
1 parent fa251bc8

Improved reindexing and notifications [skip ci]

lib/searchkick.rb
@@ -205,17 +205,20 @@ module Searchkick @@ -205,17 +205,20 @@ module Searchkick
205 end 205 end
206 206
207 # message is private 207 # message is private
208 - def self.callbacks(value = nil, message: "Bulk") 208 + def self.callbacks(value = nil, message: nil)
209 if block_given? 209 if block_given?
210 previous_value = callbacks_value 210 previous_value = callbacks_value
211 begin 211 begin
212 self.callbacks_value = value 212 self.callbacks_value = value
213 result = yield 213 result = yield
214 - if callbacks_value == :bulk  
215 - event = {  
216 - name: message,  
217 - count: indexer.queued_items.size  
218 - } 214 + if callbacks_value == :bulk && indexer.queued_items.any?
  215 + event = {}
  216 + if message
  217 + message.call(event)
  218 + else
  219 + event[:name] = "Bulk"
  220 + event[:count] = indexer.queued_items.size
  221 + end
219 ActiveSupport::Notifications.instrument("request.searchkick", event) do 222 ActiveSupport::Notifications.instrument("request.searchkick", event) do
220 indexer.perform 223 indexer.perform
221 end 224 end
lib/searchkick/record_indexer.rb
@@ -47,13 +47,13 @@ module Searchkick @@ -47,13 +47,13 @@ module Searchkick
47 index.reindex_queue.push_records(records) 47 index.reindex_queue.push_records(records)
48 when true, :inline 48 when true, :inline
49 index_records, other_records = records.partition { |r| index_record?(r) } 49 index_records, other_records = records.partition { |r| index_record?(r) }
50 - import_inline(index_records, !full ? other_records : [], method_name: method_name) 50 + import_inline(index_records, !full ? other_records : [], method_name: method_name, single: single)
51 else 51 else
52 raise ArgumentError, "Invalid value for mode" 52 raise ArgumentError, "Invalid value for mode"
53 end 53 end
54 end 54 end
55 55
56 - def reindex_items(klass, items, method_name:) 56 + def reindex_items(klass, items, method_name:, single: false)
57 routing = items.to_h { |r| [r[:id], r[:routing]] } 57 routing = items.to_h { |r| [r[:id], r[:routing]] }
58 record_ids = routing.keys 58 record_ids = routing.keys
59 59
@@ -68,7 +68,7 @@ module Searchkick @@ -68,7 +68,7 @@ module Searchkick
68 construct_record(klass, id, routing[id]) 68 construct_record(klass, id, routing[id])
69 end 69 end
70 70
71 - import_inline(records, delete_records, method_name: method_name) 71 + import_inline(records, delete_records, method_name: method_name, single: false)
72 end 72 end
73 73
74 private 74 private
@@ -78,23 +78,51 @@ module Searchkick @@ -78,23 +78,51 @@ module Searchkick
78 end 78 end
79 79
80 # import in single request with retries 80 # import in single request with retries
81 - def import_inline(index_records, delete_records, method_name:) 81 + def import_inline(index_records, delete_records, method_name:, single:)
82 return if index_records.empty? && delete_records.empty? 82 return if index_records.empty? && delete_records.empty?
83 83
84 - action = method_name ? "Update" : "Import"  
85 - name = (index_records.first || delete_records.first).searchkick_klass.name  
86 - with_retries do  
87 - Searchkick.callbacks(:bulk, message: "#{name} #{action}") do  
88 - if index_records.any?  
89 - if method_name  
90 - index.bulk_update(index_records, method_name)  
91 - else  
92 - index.bulk_index(index_records)  
93 - end 84 + maybe_bulk(index_records, delete_records, method_name, single) do
  85 + if index_records.any?
  86 + if method_name
  87 + index.bulk_update(index_records, method_name)
  88 + else
  89 + index.bulk_index(index_records)
94 end 90 end
  91 + end
  92 +
  93 + if delete_records.any?
  94 + index.bulk_delete(delete_records)
  95 + end
  96 + end
  97 + end
  98 +
  99 + def maybe_bulk(index_records, delete_records, method_name, single)
  100 + if Searchkick.callbacks_value == :bulk
  101 + yield
  102 + else
  103 + # set action and data
  104 + action =
  105 + if single && index_records.empty?
  106 + "Remove"
  107 + elsif method_name
  108 + "Update"
  109 + else
  110 + single ? "Store" : "Import"
  111 + end
  112 + record = index_records.first || delete_records.first
  113 + name = record.searchkick_klass.name
  114 + message = lambda do |event|
  115 + event[:name] = "#{name} #{action}"
  116 + if single
  117 + event[:id] = index.search_id(record)
  118 + else
  119 + event[:count] = index_records.size + delete_records.size
  120 + end
  121 + end
95 122
96 - if delete_records.any?  
97 - index.bulk_delete(delete_records) 123 + with_retries do
  124 + Searchkick.callbacks(:bulk, message: message) do
  125 + yield
98 end 126 end
99 end 127 end
100 end 128 end
lib/searchkick/reindex_v2_job.rb
@@ -7,8 +7,7 @@ module Searchkick @@ -7,8 +7,7 @@ module Searchkick
7 # may not be needed if calling search_import later 7 # may not be needed if calling search_import later
8 model = model.unscoped if model.respond_to?(:unscoped) 8 model = model.unscoped if model.respond_to?(:unscoped)
9 items = [{id: id, routing: routing}] 9 items = [{id: id, routing: routing}]
10 - # TODO improve notification  
11 - model.searchkick_index.send(:record_indexer).reindex_items(model, items, method_name: method_name) 10 + model.searchkick_index.send(:record_indexer).reindex_items(model, items, method_name: method_name, single: true)
12 end 11 end
13 end 12 end
14 end 13 end