Commit fa251bc852e023dcd82340789986724a210cee66

Authored by Andrew Kane
1 parent f44940fa

Renamed BulkRecordIndexer to RecordIndexer [skip ci]

lib/searchkick.rb
@@ -10,7 +10,6 @@ require "hashie" @@ -10,7 +10,6 @@ require "hashie"
10 require "forwardable" 10 require "forwardable"
11 11
12 # modules 12 # modules
13 -require "searchkick/bulk_record_indexer"  
14 require "searchkick/controller_runtime" 13 require "searchkick/controller_runtime"
15 require "searchkick/index" 14 require "searchkick/index"
16 require "searchkick/index_options" 15 require "searchkick/index_options"
lib/searchkick/bulk_record_indexer.rb
@@ -1,128 +0,0 @@ @@ -1,128 +0,0 @@
1 -module Searchkick  
2 - class BulkRecordIndexer  
3 - attr_reader :index  
4 -  
5 - def initialize(index)  
6 - @index = index  
7 - end  
8 -  
9 - def reindex(records, mode:, method_name:, full: false, single: false)  
10 - return if records.empty?  
11 -  
12 - case mode  
13 - when :async  
14 - unless defined?(ActiveJob)  
15 - raise Searchkick::Error, "Active Job not found"  
16 - end  
17 -  
18 - # temporary hack  
19 - if single  
20 - record = records.first  
21 -  
22 - # always pass routing in case record is deleted  
23 - # before the async job runs  
24 - if record.respond_to?(:search_routing)  
25 - routing = record.search_routing  
26 - end  
27 -  
28 - Searchkick::ReindexV2Job.perform_later(  
29 - record.class.name,  
30 - record.id.to_s,  
31 - method_name ? method_name.to_s : nil,  
32 - routing: routing  
33 - )  
34 - else  
35 - Searchkick::BulkReindexJob.perform_later(  
36 - class_name: records.first.class.searchkick_options[:class_name],  
37 - record_ids: records.map(&:id),  
38 - index_name: index.name,  
39 - method_name: method_name ? method_name.to_s : nil  
40 - )  
41 - end  
42 - when :queue  
43 - if method_name  
44 - raise Searchkick::Error, "Partial reindex not supported with queue option"  
45 - end  
46 -  
47 - index.reindex_queue.push_records(records)  
48 - when true, :inline  
49 - index_records, other_records = records.partition { |r| index_record?(r) }  
50 - import_inline(index_records, !full ? other_records : [], method_name: method_name)  
51 - else  
52 - raise ArgumentError, "Invalid value for mode"  
53 - end  
54 - end  
55 -  
56 - def reindex_items(klass, items, method_name:)  
57 - routing = items.to_h { |r| [r[:id], r[:routing]] }  
58 - record_ids = routing.keys  
59 -  
60 - scope = Searchkick.load_records(klass, record_ids)  
61 - # scope = scope.search_import if scope.respond_to?(:search_import)  
62 - records = scope.select(&:should_index?)  
63 -  
64 - # determine which records to delete  
65 - delete_ids = record_ids - records.map { |r| r.id.to_s }  
66 - delete_records =  
67 - delete_ids.map do |id|  
68 - construct_record(klass, id, routing[id])  
69 - end  
70 -  
71 - import_inline(records, delete_records, method_name: method_name)  
72 - end  
73 -  
74 - private  
75 -  
76 - def index_record?(record)  
77 - record.persisted? && !record.destroyed? && record.should_index?  
78 - end  
79 -  
80 - # import in single request with retries  
81 - def import_inline(index_records, delete_records, method_name:)  
82 - return if index_records.empty? && delete_records.empty?  
83 -  
84 - action = method_name ? "Update" : "Import"  
85 - name = (index_records.first || delete_records.first).searchkick_klass.name  
86 - with_retries do  
87 - Searchkick.callbacks(:bulk, message: "#{name} #{action}") do  
88 - if index_records.any?  
89 - if method_name  
90 - index.bulk_update(index_records, method_name)  
91 - else  
92 - index.bulk_index(index_records)  
93 - end  
94 - end  
95 -  
96 - if delete_records.any?  
97 - index.bulk_delete(delete_records)  
98 - end  
99 - end  
100 - end  
101 - end  
102 -  
103 - def construct_record(klass, id, routing)  
104 - record = klass.new  
105 - record.id = id  
106 - if routing  
107 - record.define_singleton_method(:search_routing) do  
108 - routing  
109 - end  
110 - end  
111 - record  
112 - end  
113 -  
114 - def with_retries  
115 - retries = 0  
116 -  
117 - begin  
118 - yield  
119 - rescue Faraday::ClientError => e  
120 - if retries < 1  
121 - retries += 1  
122 - retry  
123 - end  
124 - raise e  
125 - end  
126 - end  
127 - end  
128 -end  
lib/searchkick/bulk_reindex_job.rb
@@ -9,7 +9,7 @@ module Searchkick @@ -9,7 +9,7 @@ module Searchkick
9 records = Searchkick.load_records(klass, record_ids) 9 records = Searchkick.load_records(klass, record_ids)
10 10
11 # TODO expose functionality on index 11 # TODO expose functionality on index
12 - index.send(:bulk_record_indexer).reindex(records, mode: :inline, method_name: method_name, full: false) 12 + index.send(:record_indexer).reindex(records, mode: :inline, method_name: method_name, full: false)
13 index.send(:relation_indexer).batch_completed(batch_id) if batch_id 13 index.send(:relation_indexer).batch_completed(batch_id) if batch_id
14 end 14 end
15 end 15 end
lib/searchkick/index.rb
@@ -302,8 +302,8 @@ module Searchkick @@ -302,8 +302,8 @@ module Searchkick
302 @relation_indexer ||= RelationIndexer.new(self) 302 @relation_indexer ||= RelationIndexer.new(self)
303 end 303 end
304 304
305 - def bulk_record_indexer  
306 - @bulk_record_indexer ||= BulkRecordIndexer.new(self) 305 + def record_indexer
  306 + @record_indexer ||= RecordIndexer.new(self)
307 end 307 end
308 308
309 def index_settings 309 def index_settings
lib/searchkick/model.rb
@@ -29,7 +29,7 @@ module Searchkick @@ -29,7 +29,7 @@ module Searchkick
29 def reindex(method_name = nil, mode: nil, refresh: false) 29 def reindex(method_name = nil, mode: nil, refresh: false)
30 mode ||= Searchkick.callbacks_value || self.class.searchkick_index.options[:callbacks] || true 30 mode ||= Searchkick.callbacks_value || self.class.searchkick_index.options[:callbacks] || true
31 mode = :inline if mode == :bulk 31 mode = :inline if mode == :bulk
32 - result = self.class.searchkick_index.send(:bulk_record_indexer).reindex([self], mode: mode, method_name: method_name, single: true) 32 + result = self.class.searchkick_index.send(:record_indexer).reindex([self], mode: mode, method_name: method_name, single: true)
33 self.class.searchkick_index.refresh if refresh 33 self.class.searchkick_index.refresh if refresh
34 result 34 result
35 end 35 end
lib/searchkick/process_batch_job.rb
@@ -15,7 +15,7 @@ module Searchkick @@ -15,7 +15,7 @@ module Searchkick
15 15
16 relation = klass 16 relation = klass
17 relation = relation.search_import if relation.respond_to?(:search_import) 17 relation = relation.search_import if relation.respond_to?(:search_import)
18 - index.send(:bulk_record_indexer).reindex_items(relation, items, method_name: nil) 18 + index.send(:record_indexer).reindex_items(relation, items, method_name: nil)
19 end 19 end
20 end 20 end
21 end 21 end
lib/searchkick/record_indexer.rb
1 module Searchkick 1 module Searchkick
2 class RecordIndexer 2 class RecordIndexer
3 - attr_reader :record, :index 3 + attr_reader :index
4 4
5 - def initialize(record)  
6 - @record = record  
7 - @index = record.class.searchkick_index 5 + def initialize(index)
  6 + @index = index
8 end 7 end
9 8
10 - def reindex(method_name = nil, refresh: false, mode: nil)  
11 - unless [:inline, true, nil, :async, :queue].include?(mode)  
12 - raise ArgumentError, "Invalid value for mode"  
13 - end  
14 -  
15 - mode ||= Searchkick.callbacks_value || index.options[:callbacks] || true 9 + def reindex(records, mode:, method_name:, full: false, single: false)
  10 + return if records.empty?
16 11
17 case mode 12 case mode
18 - when :queue  
19 - if method_name  
20 - raise Searchkick::Error, "Partial reindex not supported with queue option"  
21 - end  
22 -  
23 - index.reindex_queue.push_records([record])  
24 when :async 13 when :async
25 unless defined?(ActiveJob) 14 unless defined?(ActiveJob)
26 raise Searchkick::Error, "Active Job not found" 15 raise Searchkick::Error, "Active Job not found"
27 end 16 end
28 17
29 - # always pass routing in case record is deleted  
30 - # before the async job runs  
31 - if record.respond_to?(:search_routing)  
32 - routing = record.search_routing  
33 - end 18 + # temporary hack
  19 + if single
  20 + record = records.first
  21 +
  22 + # always pass routing in case record is deleted
  23 + # before the async job runs
  24 + if record.respond_to?(:search_routing)
  25 + routing = record.search_routing
  26 + end
34 27
35 - Searchkick::ReindexV2Job.perform_later(  
36 - record.class.name,  
37 - record.id.to_s,  
38 - method_name ? method_name.to_s : nil,  
39 - routing: routing  
40 - )  
41 - else # bulk, inline/true/nil  
42 - reindex_record(method_name) 28 + Searchkick::ReindexV2Job.perform_later(
  29 + record.class.name,
  30 + record.id.to_s,
  31 + method_name ? method_name.to_s : nil,
  32 + routing: routing
  33 + )
  34 + else
  35 + Searchkick::BulkReindexJob.perform_later(
  36 + class_name: records.first.class.searchkick_options[:class_name],
  37 + record_ids: records.map(&:id),
  38 + index_name: index.name,
  39 + method_name: method_name ? method_name.to_s : nil
  40 + )
  41 + end
  42 + when :queue
  43 + if method_name
  44 + raise Searchkick::Error, "Partial reindex not supported with queue option"
  45 + end
43 46
44 - index.refresh if refresh 47 + index.reindex_queue.push_records(records)
  48 + when true, :inline
  49 + index_records, other_records = records.partition { |r| index_record?(r) }
  50 + import_inline(index_records, !full ? other_records : [], method_name: method_name)
  51 + else
  52 + raise ArgumentError, "Invalid value for mode"
45 end 53 end
46 end 54 end
47 55
  56 + def reindex_items(klass, items, method_name:)
  57 + routing = items.to_h { |r| [r[:id], r[:routing]] }
  58 + record_ids = routing.keys
  59 +
  60 + scope = Searchkick.load_records(klass, record_ids)
  61 + # scope = scope.search_import if scope.respond_to?(:search_import)
  62 + records = scope.select(&:should_index?)
  63 +
  64 + # determine which records to delete
  65 + delete_ids = record_ids - records.map { |r| r.id.to_s }
  66 + delete_records =
  67 + delete_ids.map do |id|
  68 + construct_record(klass, id, routing[id])
  69 + end
  70 +
  71 + import_inline(records, delete_records, method_name: method_name)
  72 + end
  73 +
48 private 74 private
49 75
50 - def reindex_record(method_name)  
51 - if record.destroyed? || !record.persisted? || !record.should_index?  
52 - begin  
53 - index.remove(record)  
54 - rescue => e  
55 - raise e unless Searchkick.not_found_error?(e)  
56 - # do nothing if not found 76 + def index_record?(record)
  77 + record.persisted? && !record.destroyed? && record.should_index?
  78 + end
  79 +
  80 + # import in single request with retries
  81 + def import_inline(index_records, delete_records, method_name:)
  82 + return if index_records.empty? && delete_records.empty?
  83 +
  84 + action = method_name ? "Update" : "Import"
  85 + name = (index_records.first || delete_records.first).searchkick_klass.name
  86 + with_retries do
  87 + Searchkick.callbacks(:bulk, message: "#{name} #{action}") do
  88 + if index_records.any?
  89 + if method_name
  90 + index.bulk_update(index_records, method_name)
  91 + else
  92 + index.bulk_index(index_records)
  93 + end
  94 + end
  95 +
  96 + if delete_records.any?
  97 + index.bulk_delete(delete_records)
  98 + end
57 end 99 end
58 - else  
59 - if method_name  
60 - index.update_record(record, method_name)  
61 - else  
62 - index.store(record) 100 + end
  101 + end
  102 +
  103 + def construct_record(klass, id, routing)
  104 + record = klass.new
  105 + record.id = id
  106 + if routing
  107 + record.define_singleton_method(:search_routing) do
  108 + routing
  109 + end
  110 + end
  111 + record
  112 + end
  113 +
  114 + def with_retries
  115 + retries = 0
  116 +
  117 + begin
  118 + yield
  119 + rescue Faraday::ClientError => e
  120 + if retries < 1
  121 + retries += 1
  122 + retry
63 end 123 end
  124 + raise e
64 end 125 end
65 end 126 end
66 end 127 end
lib/searchkick/reindex_v2_job.rb
@@ -8,7 +8,7 @@ module Searchkick @@ -8,7 +8,7 @@ module Searchkick
8 model = model.unscoped if model.respond_to?(:unscoped) 8 model = model.unscoped if model.respond_to?(:unscoped)
9 items = [{id: id, routing: routing}] 9 items = [{id: id, routing: routing}]
10 # TODO improve notification 10 # TODO improve notification
11 - model.searchkick_index.send(:bulk_record_indexer).reindex_items(model, items, method_name: method_name) 11 + model.searchkick_index.send(:record_indexer).reindex_items(model, items, method_name: method_name)
12 end 12 end
13 end 13 end
14 end 14 end
lib/searchkick/relation_indexer.rb
@@ -49,7 +49,7 @@ module Searchkick @@ -49,7 +49,7 @@ module Searchkick
49 private 49 private
50 50
51 def import_or_update(records, method_name, mode, full) 51 def import_or_update(records, method_name, mode, full)
52 - bulk_record_indexer.reindex( 52 + record_indexer.reindex(
53 records, 53 records,
54 mode: mode, 54 mode: mode,
55 method_name: method_name, 55 method_name: method_name,
@@ -131,8 +131,8 @@ module Searchkick @@ -131,8 +131,8 @@ module Searchkick
131 @batch_size ||= index.options[:batch_size] || 1000 131 @batch_size ||= index.options[:batch_size] || 1000
132 end 132 end
133 133
134 - def bulk_record_indexer  
135 - @bulk_record_indexer ||= index.send(:bulk_record_indexer) 134 + def record_indexer
  135 + @record_indexer ||= index.send(:record_indexer)
136 end 136 end
137 end 137 end
138 end 138 end