record_indexer.rb
3.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
module Searchkick
class RecordIndexer
attr_reader :index
def initialize(index)
@index = index
end
def reindex(records, mode:, method_name:, full: false, single: false)
return if records.empty?
case mode
when :async
unless defined?(ActiveJob)
raise Searchkick::Error, "Active Job not found"
end
# temporary hack
if single
record = records.first
# always pass routing in case record is deleted
# before the async job runs
if record.respond_to?(:search_routing)
routing = record.search_routing
end
Searchkick::ReindexV2Job.perform_later(
record.class.name,
record.id.to_s,
method_name ? method_name.to_s : nil,
routing: routing
)
else
Searchkick::BulkReindexJob.perform_later(
class_name: records.first.class.searchkick_options[:class_name],
record_ids: records.map(&:id),
index_name: index.name,
method_name: method_name ? method_name.to_s : nil
)
end
when :queue
if method_name
raise Searchkick::Error, "Partial reindex not supported with queue option"
end
index.reindex_queue.push_records(records)
when true, :inline
index_records, other_records = records.partition { |r| index_record?(r) }
import_inline(index_records, !full ? other_records : [], method_name: method_name)
else
raise ArgumentError, "Invalid value for mode"
end
end
def reindex_items(klass, items, method_name:)
routing = items.to_h { |r| [r[:id], r[:routing]] }
record_ids = routing.keys
scope = Searchkick.load_records(klass, record_ids)
# scope = scope.search_import if scope.respond_to?(:search_import)
records = scope.select(&:should_index?)
# determine which records to delete
delete_ids = record_ids - records.map { |r| r.id.to_s }
delete_records =
delete_ids.map do |id|
construct_record(klass, id, routing[id])
end
import_inline(records, delete_records, method_name: method_name)
end
private
def index_record?(record)
record.persisted? && !record.destroyed? && record.should_index?
end
# import in single request with retries
def import_inline(index_records, delete_records, method_name:)
return if index_records.empty? && delete_records.empty?
action = method_name ? "Update" : "Import"
name = (index_records.first || delete_records.first).searchkick_klass.name
with_retries do
Searchkick.callbacks(:bulk, message: "#{name} #{action}") do
if index_records.any?
if method_name
index.bulk_update(index_records, method_name)
else
index.bulk_index(index_records)
end
end
if delete_records.any?
index.bulk_delete(delete_records)
end
end
end
end
def construct_record(klass, id, routing)
record = klass.new
record.id = id
if routing
record.define_singleton_method(:search_routing) do
routing
end
end
record
end
def with_retries
retries = 0
begin
yield
rescue Faraday::ClientError => e
if retries < 1
retries += 1
retry
end
raise e
end
end
end
end