Commit d3b7360ba0408df15e9ae70607042f68a70b3ee5

Authored by Andrew Kane
1 parent dd3c8ad6

Started split of record and relation reindexing [skip ci]

lib/searchkick.rb
@@ -11,6 +11,7 @@ require "forwardable" @@ -11,6 +11,7 @@ require "forwardable"
11 11
12 # modules 12 # modules
13 require "searchkick/bulk_indexer" 13 require "searchkick/bulk_indexer"
  14 +require "searchkick/bulk_record_indexer"
14 require "searchkick/controller_runtime" 15 require "searchkick/controller_runtime"
15 require "searchkick/index" 16 require "searchkick/index"
16 require "searchkick/index_options" 17 require "searchkick/index_options"
lib/searchkick/bulk_indexer.rb
@@ -27,7 +27,7 @@ module Searchkick @@ -27,7 +27,7 @@ module Searchkick
27 construct_record(klass, id, routing[id]) 27 construct_record(klass, id, routing[id])
28 end 28 end
29 29
30 - import_inline(records, delete_records, method_name: nil) 30 + bulk_record_indexer.import_inline(records, delete_records, method_name: nil)
31 end 31 end
32 32
33 def construct_record(klass, id, routing) 33 def construct_record(klass, id, routing)
@@ -80,49 +80,12 @@ module Searchkick @@ -80,49 +80,12 @@ module Searchkick
80 private 80 private
81 81
82 def import_or_update(records, method_name, mode, full) 82 def import_or_update(records, method_name, mode, full)
83 - if records.any?  
84 - case mode  
85 - when :async  
86 - Searchkick::BulkReindexJob.perform_later(  
87 - class_name: records.first.class.searchkick_options[:class_name],  
88 - record_ids: records.map(&:id),  
89 - index_name: index.name,  
90 - method_name: method_name ? method_name.to_s : nil  
91 - )  
92 - when :queue  
93 - if method_name  
94 - raise Searchkick::Error, "Partial reindex not supported with queue option"  
95 - end  
96 -  
97 - index.reindex_queue.push_records(records)  
98 - when true, :inline  
99 - index_records, other_records = records.partition(&:should_index?)  
100 - import_inline(index_records, !full ? other_records : [], method_name: method_name)  
101 - else  
102 - raise ArgumentError, "Invalid value for mode"  
103 - end  
104 - end  
105 - end  
106 -  
107 - # import in single request with retries  
108 - def import_inline(index_records, delete_records, method_name:)  
109 - action = method_name ? "Update" : "Import"  
110 - name = (index_records.first || delete_records.first).searchkick_klass.name  
111 - with_retries do  
112 - Searchkick.callbacks(:bulk, message: "#{name} #{action}") do  
113 - if index_records.any?  
114 - if method_name  
115 - index.bulk_update(index_records, method_name)  
116 - else  
117 - index.bulk_index(index_records)  
118 - end  
119 - end  
120 -  
121 - if delete_records.any?  
122 - index.bulk_delete(delete_records)  
123 - end  
124 - end  
125 - end 83 + bulk_record_indexer.reindex(
  84 + records,
  85 + mode: mode,
  86 + method_name: method_name,
  87 + full: full
  88 + )
126 end 89 end
127 90
128 def full_reindex_async(scope) 91 def full_reindex_async(scope)
@@ -191,20 +154,6 @@ module Searchkick @@ -191,20 +154,6 @@ module Searchkick
191 }.merge(options)) 154 }.merge(options))
192 end 155 end
193 156
194 - def with_retries  
195 - retries = 0  
196 -  
197 - begin  
198 - yield  
199 - rescue Faraday::ClientError => e  
200 - if retries < 1  
201 - retries += 1  
202 - retry  
203 - end  
204 - raise e  
205 - end  
206 - end  
207 -  
208 def batches_key 157 def batches_key
209 "searchkick:reindex:#{index.name}:batches" 158 "searchkick:reindex:#{index.name}:batches"
210 end 159 end
@@ -212,5 +161,9 @@ module Searchkick @@ -212,5 +161,9 @@ module Searchkick
212 def batch_size 161 def batch_size
213 @batch_size ||= index.options[:batch_size] || 1000 162 @batch_size ||= index.options[:batch_size] || 1000
214 end 163 end
  164 +
  165 + def bulk_record_indexer
  166 + @bulk_record_indexer ||= BulkRecordIndexer.new(index)
  167 + end
215 end 168 end
216 end 169 end
lib/searchkick/bulk_record_indexer.rb 0 → 100644
@@ -0,0 +1,74 @@ @@ -0,0 +1,74 @@
  1 +module Searchkick
  2 + class BulkRecordIndexer
  3 + attr_reader :index
  4 +
  5 + def initialize(index)
  6 + @index = index
  7 + end
  8 +
  9 + def reindex(records, mode:, method_name:, full:)
  10 + return if records.empty?
  11 +
  12 + case mode
  13 + when :async
  14 + Searchkick::BulkReindexJob.perform_later(
  15 + class_name: records.first.class.searchkick_options[:class_name],
  16 + record_ids: records.map(&:id),
  17 + index_name: index.name,
  18 + method_name: method_name ? method_name.to_s : nil
  19 + )
  20 + when :queue
  21 + if method_name
  22 + raise Searchkick::Error, "Partial reindex not supported with queue option"
  23 + end
  24 +
  25 + index.reindex_queue.push_records(records)
  26 + when true, :inline
  27 + index_records, other_records = records.partition(&:should_index?)
  28 + import_inline(index_records, !full ? other_records : [], method_name: method_name)
  29 + else
  30 + raise ArgumentError, "Invalid value for mode"
  31 + end
  32 + end
  33 +
  34 + # import in single request with retries
  35 + # TODO make private
  36 + def import_inline(index_records, delete_records, method_name:)
  37 + return if index_records.empty? && delete_records.empty?
  38 +
  39 + action = method_name ? "Update" : "Import"
  40 + name = (index_records.first || delete_records.first).searchkick_klass.name
  41 + with_retries do
  42 + Searchkick.callbacks(:bulk, message: "#{name} #{action}") do
  43 + if index_records.any?
  44 + if method_name
  45 + index.bulk_update(index_records, method_name)
  46 + else
  47 + index.bulk_index(index_records)
  48 + end
  49 + end
  50 +
  51 + if delete_records.any?
  52 + index.bulk_delete(delete_records)
  53 + end
  54 + end
  55 + end
  56 + end
  57 +
  58 + private
  59 +
  60 + def with_retries
  61 + retries = 0
  62 +
  63 + begin
  64 + yield
  65 + rescue Faraday::ClientError => e
  66 + if retries < 1
  67 + retries += 1
  68 + retry
  69 + end
  70 + raise e
  71 + end
  72 + end
  73 + end
  74 +end