relation_indexer.rb
4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
module Searchkick
class RelationIndexer
attr_reader :index
def initialize(index)
@index = index
end
def reindex(relation, mode:, method_name: nil, full: false, resume: false, scope: nil)
# apply scopes
if scope
relation = relation.send(scope)
elsif relation.respond_to?(:search_import)
relation = relation.search_import
end
# remove unneeded loading for async
if mode == :async
if relation.respond_to?(:primary_key)
relation = relation.select(relation.primary_key).except(:includes, :preload)
elsif relation.respond_to?(:only)
relation = relation.only(:_id)
end
end
if mode == :async && full
return full_reindex_async(relation)
end
relation = resume_relation(relation) if resume
reindex_options = {
mode: mode,
method_name: method_name,
full: full
}
record_indexer = RecordIndexer.new(index)
in_batches(relation) do |items|
record_indexer.reindex(items, **reindex_options)
end
end
def batches_left
Searchkick.with_redis { |r| r.scard(batches_key) }
end
def batch_completed(batch_id)
Searchkick.with_redis { |r| r.srem(batches_key, batch_id) }
end
private
def resume_relation(relation)
if relation.respond_to?(:primary_key)
# use total docs instead of max id since there's not a great way
# to get the max _id without scripting since it's a string
where = relation.arel_table[relation.primary_key].gt(index.total_docs)
relation = relation.where(where)
else
raise Error, "Resume not supported for Mongoid"
end
end
def in_batches(relation)
if relation.respond_to?(:find_in_batches)
klass = relation.klass
# remove order to prevent possible warnings
relation.except(:order).find_in_batches(batch_size: batch_size) do |batch|
# prevent scope from affecting search_data as well as inline jobs
# Active Record runs relation calls in scoping block
# https://github.com/rails/rails/blob/main/activerecord/lib/active_record/relation/delegation.rb
# note: we could probably just call klass.current_scope = nil
# anywhere in reindex method (after initial all call),
# but this is more cautious
previous_scope = klass.current_scope(true)
if previous_scope
begin
klass.current_scope = nil
yield batch
ensure
klass.current_scope = previous_scope
end
else
yield batch
end
end
else
klass = relation.klass
each_batch(relation, batch_size: batch_size) do |batch|
# prevent scope from affecting search_data as well as inline jobs
# note: Model.with_scope doesn't always restore scope, so use custom logic
previous_scope = Mongoid::Threaded.current_scope(klass)
if previous_scope
begin
Mongoid::Threaded.set_current_scope(nil, klass)
yield batch
ensure
Mongoid::Threaded.set_current_scope(previous_scope, klass)
end
else
yield batch
end
end
end
end
def each_batch(relation, batch_size:)
# https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb
# use cursor for Mongoid
items = []
relation.all.each do |item|
items << item
if items.length == batch_size
yield items
items = []
end
end
yield items if items.any?
end
def batch_size
@batch_size ||= index.options[:batch_size] || 1000
end
def full_reindex_async(relation)
batch_id = 1
class_name = relation.searchkick_options[:class_name]
in_batches(relation) do |items|
batch_job(class_name, batch_id, items.map(&:id))
batch_id += 1
end
end
def batch_job(class_name, batch_id, record_ids)
Searchkick.with_redis { |r| r.sadd(batches_key, batch_id) }
Searchkick::BulkReindexJob.perform_later(
class_name: class_name,
index_name: index.name,
batch_id: batch_id,
record_ids: record_ids.map { |v| v.instance_of?(Integer) ? v : v.to_s }
)
end
def batches_key
"searchkick:reindex:#{index.name}:batches"
end
end
end