Commit ae0b5f8f95134ddebb045be36be5d8df35ded341

Authored by Andrew Kane
1 parent c8178e59

Renamed BulkIndexer to RelationIndexer [skip ci]

lib/searchkick.rb
... ... @@ -10,7 +10,6 @@ require "hashie"
10 10 require "forwardable"
11 11  
12 12 # modules
13   -require "searchkick/bulk_indexer"
14 13 require "searchkick/bulk_record_indexer"
15 14 require "searchkick/controller_runtime"
16 15 require "searchkick/index"
... ... @@ -25,6 +24,7 @@ require "searchkick/reindex_queue"
25 24 require "searchkick/record_data"
26 25 require "searchkick/record_indexer"
27 26 require "searchkick/relation"
  27 +require "searchkick/relation_indexer"
28 28 require "searchkick/results"
29 29 require "searchkick/version"
30 30  
... ...
lib/searchkick/bulk_indexer.rb
... ... @@ -1,168 +0,0 @@
1   -module Searchkick
2   - class BulkIndexer
3   - attr_reader :index
4   -
5   - def initialize(index)
6   - @index = index
7   - end
8   -
9   - def import_queue(klass, record_ids)
10   - # separate routing from id
11   - routing = Hash[record_ids.map { |r| r.split(/(?<!\|)\|(?!\|)/, 2).map { |v| v.gsub("||", "|") } }]
12   - record_ids = routing.keys
13   -
14   - scope = Searchkick.load_records(klass, record_ids)
15   - scope = scope.search_import if scope.respond_to?(:search_import)
16   - records = scope.select(&:should_index?)
17   -
18   - # determine which records to delete
19   - delete_ids = record_ids - records.map { |r| r.id.to_s }
20   - delete_records =
21   - delete_ids.map do |id|
22   - construct_record(klass, id, routing[id])
23   - end
24   -
25   - bulk_record_indexer.import_inline(records, delete_records, method_name: nil)
26   - end
27   -
28   - def construct_record(klass, id, routing)
29   - record = klass.new
30   - record.id = id
31   - if routing
32   - record.define_singleton_method(:search_routing) do
33   - routing
34   - end
35   - end
36   - record
37   - end
38   -
39   - def import_scope(relation, resume: false, method_name: nil, async: false, full: false, scope: nil, mode: nil)
40   - if scope
41   - relation = relation.send(scope)
42   - elsif relation.respond_to?(:search_import)
43   - relation = relation.search_import
44   - end
45   -
46   - mode ||= (async ? :async : :inline)
47   -
48   - if full && async
49   - full_reindex_async(relation)
50   - elsif relation.respond_to?(:find_in_batches)
51   - if resume
52   - # use total docs instead of max id since there's not a great way
53   - # to get the max _id without scripting since it's a string
54   -
55   - # TODO use primary key and prefix with table name
56   - relation = relation.where("id > ?", index.total_docs)
57   - end
58   -
59   - relation = relation.select("id").except(:includes, :preload) if mode == :async
60   -
61   - relation.find_in_batches batch_size: batch_size do |items|
62   - import_or_update items, method_name, mode, full
63   - end
64   - else
65   - each_batch(relation) do |items|
66   - import_or_update items, method_name, mode, full
67   - end
68   - end
69   - end
70   -
71   - def batches_left
72   - Searchkick.with_redis { |r| r.scard(batches_key) }
73   - end
74   -
75   - def batch_completed(batch_id)
76   - Searchkick.with_redis { |r| r.srem(batches_key, batch_id) }
77   - end
78   -
79   - private
80   -
81   - def import_or_update(records, method_name, mode, full)
82   - bulk_record_indexer.reindex(
83   - records,
84   - mode: mode,
85   - method_name: method_name,
86   - full: full
87   - )
88   - end
89   -
90   - def full_reindex_async(scope)
91   - if scope.respond_to?(:primary_key)
92   - # TODO expire Redis key
93   - primary_key = scope.primary_key
94   -
95   - scope = scope.select(primary_key).except(:includes, :preload)
96   -
97   - starting_id =
98   - begin
99   - scope.minimum(primary_key)
100   - rescue ActiveRecord::StatementInvalid
101   - false
102   - end
103   -
104   - if starting_id.nil?
105   - # no records, do nothing
106   - elsif starting_id.is_a?(Numeric)
107   - max_id = scope.maximum(primary_key)
108   - batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil
109   -
110   - batches_count.times do |i|
111   - batch_id = i + 1
112   - min_id = starting_id + (i * batch_size)
113   - bulk_reindex_job scope, batch_id, min_id: min_id, max_id: min_id + batch_size - 1
114   - end
115   - else
116   - scope.find_in_batches(batch_size: batch_size).each_with_index do |batch, i|
117   - batch_id = i + 1
118   -
119   - bulk_reindex_job scope, batch_id, record_ids: batch.map { |record| record.id.to_s }
120   - end
121   - end
122   - else
123   - batch_id = 1
124   - # TODO remove any eager loading
125   - scope = scope.only(:_id) if scope.respond_to?(:only)
126   - each_batch(scope) do |items|
127   - bulk_reindex_job scope, batch_id, record_ids: items.map { |i| i.id.to_s }
128   - batch_id += 1
129   - end
130   - end
131   - end
132   -
133   - def each_batch(scope)
134   - # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb
135   - # use cursor for Mongoid
136   - items = []
137   - scope.all.each do |item|
138   - items << item
139   - if items.length == batch_size
140   - yield items
141   - items = []
142   - end
143   - end
144   - yield items if items.any?
145   - end
146   -
147   - def bulk_reindex_job(scope, batch_id, options)
148   - Searchkick.with_redis { |r| r.sadd(batches_key, batch_id) }
149   - Searchkick::BulkReindexJob.perform_later(**{
150   - class_name: scope.searchkick_options[:class_name],
151   - index_name: index.name,
152   - batch_id: batch_id
153   - }.merge(options))
154   - end
155   -
156   - def batches_key
157   - "searchkick:reindex:#{index.name}:batches"
158   - end
159   -
160   - def batch_size
161   - @batch_size ||= index.options[:batch_size] || 1000
162   - end
163   -
164   - def bulk_record_indexer
165   - @bulk_record_indexer ||= index.send(:bulk_record_indexer)
166   - end
167   - end
168   -end
lib/searchkick/bulk_reindex_job.rb
... ... @@ -10,7 +10,7 @@ module Searchkick
10 10  
11 11 # TODO expose functionality on index
12 12 index.send(:bulk_record_indexer).reindex(records, mode: :inline, method_name: method_name, full: false)
13   - index.send(:bulk_indexer).batch_completed(batch_id) if batch_id
  13 + index.send(:relation_indexer).batch_completed(batch_id) if batch_id
14 14 end
15 15 end
16 16 end
... ...
lib/searchkick/index.rb
... ... @@ -235,11 +235,11 @@ module Searchkick
235 235 end
236 236  
237 237 def import_scope(relation, **options)
238   - bulk_indexer.import_scope(relation, **options)
  238 + relation_indexer.import_scope(relation, **options)
239 239 end
240 240  
241 241 def batches_left
242   - bulk_indexer.batches_left
  242 + relation_indexer.batches_left
243 243 end
244 244  
245 245 # other
... ... @@ -298,8 +298,8 @@ module Searchkick
298 298 Searchkick.indexer.queue(records.map { |r| RecordData.new(self, r).update_data(method_name) })
299 299 end
300 300  
301   - def bulk_indexer
302   - @bulk_indexer ||= BulkIndexer.new(self)
  301 + def relation_indexer
  302 + @relation_indexer ||= RelationIndexer.new(self)
303 303 end
304 304  
305 305 def bulk_record_indexer
... ...
lib/searchkick/process_batch_job.rb
... ... @@ -5,7 +5,8 @@ module Searchkick
5 5 def perform(class_name:, record_ids:, index_name: nil)
6 6 klass = class_name.constantize
7 7 index = klass.searchkick_index(name: index_name)
8   - index.send(:bulk_indexer).import_queue(klass, record_ids)
  8 + # TODO move import_queue logic
  9 + index.send(:relation_indexer).import_queue(klass, record_ids)
9 10 end
10 11 end
11 12 end
... ...
lib/searchkick/reindex_v2_job.rb
... ... @@ -23,7 +23,8 @@ module Searchkick
23 23 nil
24 24 end
25 25  
26   - record ||= model.searchkick_index.send(:bulk_indexer).construct_record(model, id, routing)
  26 + # TODO move construct_record logic
  27 + record ||= model.searchkick_index.send(:relation_indexer).construct_record(model, id, routing)
27 28  
28 29 RecordIndexer.new(record).reindex(method_name, mode: :inline)
29 30 end
... ...
lib/searchkick/relation_indexer.rb 0 โ†’ 100644
... ... @@ -0,0 +1,168 @@
  1 +module Searchkick
  2 + class RelationIndexer
  3 + attr_reader :index
  4 +
  5 + def initialize(index)
  6 + @index = index
  7 + end
  8 +
  9 + def import_queue(klass, record_ids)
  10 + # separate routing from id
  11 + routing = Hash[record_ids.map { |r| r.split(/(?<!\|)\|(?!\|)/, 2).map { |v| v.gsub("||", "|") } }]
  12 + record_ids = routing.keys
  13 +
  14 + scope = Searchkick.load_records(klass, record_ids)
  15 + scope = scope.search_import if scope.respond_to?(:search_import)
  16 + records = scope.select(&:should_index?)
  17 +
  18 + # determine which records to delete
  19 + delete_ids = record_ids - records.map { |r| r.id.to_s }
  20 + delete_records =
  21 + delete_ids.map do |id|
  22 + construct_record(klass, id, routing[id])
  23 + end
  24 +
  25 + bulk_record_indexer.import_inline(records, delete_records, method_name: nil)
  26 + end
  27 +
  28 + def construct_record(klass, id, routing)
  29 + record = klass.new
  30 + record.id = id
  31 + if routing
  32 + record.define_singleton_method(:search_routing) do
  33 + routing
  34 + end
  35 + end
  36 + record
  37 + end
  38 +
  39 + def import_scope(relation, resume: false, method_name: nil, async: false, full: false, scope: nil, mode: nil)
  40 + if scope
  41 + relation = relation.send(scope)
  42 + elsif relation.respond_to?(:search_import)
  43 + relation = relation.search_import
  44 + end
  45 +
  46 + mode ||= (async ? :async : :inline)
  47 +
  48 + if full && async
  49 + full_reindex_async(relation)
  50 + elsif relation.respond_to?(:find_in_batches)
  51 + if resume
  52 + # use total docs instead of max id since there's not a great way
  53 + # to get the max _id without scripting since it's a string
  54 +
  55 + # TODO use primary key and prefix with table name
  56 + relation = relation.where("id > ?", index.total_docs)
  57 + end
  58 +
  59 + relation = relation.select("id").except(:includes, :preload) if mode == :async
  60 +
  61 + relation.find_in_batches batch_size: batch_size do |items|
  62 + import_or_update items, method_name, mode, full
  63 + end
  64 + else
  65 + each_batch(relation) do |items|
  66 + import_or_update items, method_name, mode, full
  67 + end
  68 + end
  69 + end
  70 +
  71 + def batches_left
  72 + Searchkick.with_redis { |r| r.scard(batches_key) }
  73 + end
  74 +
  75 + def batch_completed(batch_id)
  76 + Searchkick.with_redis { |r| r.srem(batches_key, batch_id) }
  77 + end
  78 +
  79 + private
  80 +
  81 + def import_or_update(records, method_name, mode, full)
  82 + bulk_record_indexer.reindex(
  83 + records,
  84 + mode: mode,
  85 + method_name: method_name,
  86 + full: full
  87 + )
  88 + end
  89 +
  90 + def full_reindex_async(scope)
  91 + if scope.respond_to?(:primary_key)
  92 + # TODO expire Redis key
  93 + primary_key = scope.primary_key
  94 +
  95 + scope = scope.select(primary_key).except(:includes, :preload)
  96 +
  97 + starting_id =
  98 + begin
  99 + scope.minimum(primary_key)
  100 + rescue ActiveRecord::StatementInvalid
  101 + false
  102 + end
  103 +
  104 + if starting_id.nil?
  105 + # no records, do nothing
  106 + elsif starting_id.is_a?(Numeric)
  107 + max_id = scope.maximum(primary_key)
  108 + batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil
  109 +
  110 + batches_count.times do |i|
  111 + batch_id = i + 1
  112 + min_id = starting_id + (i * batch_size)
  113 + bulk_reindex_job scope, batch_id, min_id: min_id, max_id: min_id + batch_size - 1
  114 + end
  115 + else
  116 + scope.find_in_batches(batch_size: batch_size).each_with_index do |batch, i|
  117 + batch_id = i + 1
  118 +
  119 + bulk_reindex_job scope, batch_id, record_ids: batch.map { |record| record.id.to_s }
  120 + end
  121 + end
  122 + else
  123 + batch_id = 1
  124 + # TODO remove any eager loading
  125 + scope = scope.only(:_id) if scope.respond_to?(:only)
  126 + each_batch(scope) do |items|
  127 + bulk_reindex_job scope, batch_id, record_ids: items.map { |i| i.id.to_s }
  128 + batch_id += 1
  129 + end
  130 + end
  131 + end
  132 +
  133 + def each_batch(scope)
  134 + # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb
  135 + # use cursor for Mongoid
  136 + items = []
  137 + scope.all.each do |item|
  138 + items << item
  139 + if items.length == batch_size
  140 + yield items
  141 + items = []
  142 + end
  143 + end
  144 + yield items if items.any?
  145 + end
  146 +
  147 + def bulk_reindex_job(scope, batch_id, options)
  148 + Searchkick.with_redis { |r| r.sadd(batches_key, batch_id) }
  149 + Searchkick::BulkReindexJob.perform_later(**{
  150 + class_name: scope.searchkick_options[:class_name],
  151 + index_name: index.name,
  152 + batch_id: batch_id
  153 + }.merge(options))
  154 + end
  155 +
  156 + def batches_key
  157 + "searchkick:reindex:#{index.name}:batches"
  158 + end
  159 +
  160 + def batch_size
  161 + @batch_size ||= index.options[:batch_size] || 1000
  162 + end
  163 +
  164 + def bulk_record_indexer
  165 + @bulk_record_indexer ||= index.send(:bulk_record_indexer)
  166 + end
  167 + end
  168 +end
... ...