Commit 2321a81ea8e1cb05594fdbbac4841f4e8bf6973a

Authored by Andrew Kane
1 parent 0e98b154
Exists in reindex_refactor

Started reindex refactor

lib/searchkick/model.rb
... ... @@ -76,7 +76,7 @@ module Searchkick
76 76 end
77 77  
78 78 def reindex(method_name = nil, **options)
79   - RecordIndexer.new(self).reindex(method_name, **options)
  79 + RecordIndexer.new(self.class.searchkick_index).reindex_records([self], method_name: method_name, **options)
80 80 end unless method_defined?(:reindex)
81 81  
82 82 # TODO switch to keyword arguments
... ...
lib/searchkick/record_indexer.rb
1 1 module Searchkick
2 2 class RecordIndexer
3   - attr_reader :record, :index
  3 + attr_reader :index
4 4  
5   - def initialize(record)
6   - @record = record
7   - @index = record.class.searchkick_index
  5 + def initialize(index)
  6 + @index = index
8 7 end
9 8  
10   - def reindex(method_name = nil, refresh: false, mode: nil)
  9 + def reindex_records(records, mode: nil, method_name: nil, refresh: false)
11 10 unless [:inline, true, nil, :async, :queue].include?(mode)
12 11 raise ArgumentError, "Invalid value for mode"
13 12 end
14 13  
  14 + # check afterwards for bulk
15 15 mode ||= Searchkick.callbacks_value || index.options[:callbacks] || true
16 16  
17 17 case mode
... ... @@ -20,35 +20,59 @@ module Searchkick
20 20 raise Searchkick::Error, "Partial reindex not supported with queue option"
21 21 end
22 22  
23   - # always pass routing in case record is deleted
24   - # before the queue job runs
25   - if record.respond_to?(:search_routing)
26   - routing = record.search_routing
27   - end
  23 + record_ids =
  24 + records.map do |record|
  25 + # always pass routing in case record is deleted
  26 + # before the queue job runs
  27 + if record.respond_to?(:search_routing)
  28 + routing = record.search_routing
  29 + end
  30 +
  31 + # escape pipe with double pipe
  32 + value = queue_escape(record.id.to_s)
  33 + value = "#{value}|#{queue_escape(routing)}" if routing
  34 + value
  35 + end
28 36  
29   - # escape pipe with double pipe
30   - value = queue_escape(record.id.to_s)
31   - value = "#{value}|#{queue_escape(routing)}" if routing
32   - index.reindex_queue.push(value)
  37 + index.reindex_queue.push(*record_ids)
33 38 when :async
34 39 unless defined?(ActiveJob)
35 40 raise Searchkick::Error, "Active Job not found"
36 41 end
37 42  
38   - # always pass routing in case record is deleted
39   - # before the async job runs
40   - if record.respond_to?(:search_routing)
41   - routing = record.search_routing
42   - end
  43 + # TODO use single job
  44 + records.each do |record|
  45 + # always pass routing in case record is deleted
  46 + # before the async job runs
  47 + if record.respond_to?(:search_routing)
  48 + routing = record.search_routing
  49 + end
43 50  
44   - Searchkick::ReindexV2Job.perform_later(
45   - record.class.name,
46   - record.id.to_s,
47   - method_name ? method_name.to_s : nil,
48   - routing: routing
49   - )
  51 + Searchkick::ReindexV2Job.perform_later(
  52 + record.class.name,
  53 + record.id.to_s,
  54 + method_name ? method_name.to_s : nil,
  55 + routing: routing
  56 + )
  57 + end
50 58 else # bulk, inline/true/nil
51   - reindex_record(method_name)
  59 + delete_records, index_records = records.partition { |r| r.destroyed? || !r.persisted? || !r.should_index? }
  60 +
  61 + # TODO use
  62 + # index.bulk_delete(delete_records)
  63 + delete_records.each do |record|
  64 + begin
  65 + index.remove(record)
  66 + rescue Elasticsearch::Transport::Transport::Errors::NotFound
  67 + # do nothing
  68 + end
  69 + end
  70 +
  71 + if method_name
  72 + index.bulk_update(index_records, method_name)
  73 + else
  74 + index.bulk_index(index_records)
  75 + end
52 76  
53 77 index.refresh if refresh
54 78 end
... ... @@ -59,21 +83,5 @@ module Searchkick
59 83 def queue_escape(value)
60 84 value.gsub("|", "||")
61 85 end
62   -
63   - def reindex_record(method_name)
64   - if record.destroyed? || !record.persisted? || !record.should_index?
65   - begin
66   - index.remove(record)
67   - rescue Elasticsearch::Transport::Transport::Errors::NotFound
68   - # do nothing
69   - end
70   - else
71   - if method_name
72   - index.update_record(record, method_name)
73   - else
74   - index.store(record)
75   - end
76   - end
77   - end
78 86 end
79 87 end
... ...
lib/searchkick/reindex_queue.rb
... ... @@ -8,8 +8,8 @@ module Searchkick
8 8 raise Searchkick::Error, "Searchkick.redis not set" unless Searchkick.redis
9 9 end
10 10  
11   - def push(record_id)
12   - Searchkick.with_redis { |r| r.lpush(redis_key, record_id) }
  11 + def push(*record_ids)
  12 + Searchkick.with_redis { |r| r.lpush(redis_key, *record_ids) }
13 13 end
14 14  
15 15 # TODO use reliable queuing
... ...
lib/searchkick/reindex_v2_job.rb
... ... @@ -35,7 +35,7 @@ module Searchkick
35 35 end
36 36 end
37 37  
38   - RecordIndexer.new(record).reindex(method_name, mode: :inline)
  38 + RecordIndexer.new(model.searchkick_index).reindex_records([record], method_name: method_name, mode: :inline)
39 39 end
40 40 end
41 41 end
... ...