Commit acb266e4993184d8618683899b9bab22f609b152

Authored by Andrew Kane
1 parent 533e92f0

Added support for queuing updates - closes #815

.travis.yml
... ... @@ -3,6 +3,7 @@ language: ruby
3 3 rvm: 2.3.1
4 4 services:
5 5 - mongodb
  6 + - redis-server
6 7 before_install:
7 8 - ./test/ci/before_install.sh
8 9 script: RUBYOPT=W0 bundle exec rake test
... ...
CHANGELOG.md
  1 +## 2.0.4 [unreleased]
  2 +
  3 +- Added support for queuing updates [experimental]
  4 +
1 5 ## 2.0.3
2 6  
3 7 - Added `async` option to `reindex` [experimental]
... ...
Gemfile
... ... @@ -8,3 +8,4 @@ gem "activerecord", "~> 5.0.0"
8 8 gem "gemoji-parser"
9 9 gem "typhoeus"
10 10 gem "activejob"
  11 +gem "redis"
... ...
README.md
... ... @@ -1185,6 +1185,28 @@ And use:
1185 1185 Searchkick.reindex_status(index_name)
1186 1186 ```
1187 1187  
  1188 +### Queues [master, experimental, ActiveRecord only]
  1189 +
  1190 +You can also queue updates and do them in bulk for better performance. First, set up Redis in an initializer.
  1191 +
  1192 +```ruby
  1193 +Searchkick.redis = Redis.new
  1194 +```
  1195 +
  1196 +And ask your models to queue updates.
  1197 +
  1198 +```ruby
  1199 +class Product < ActiveRecord::Base
  1200 + searchkick callbacks: :queue
  1201 +end
  1202 +```
  1203 +
  1204 +Then, set up a background job to run.
  1205 +
  1206 +```ruby
  1207 +Searchkick::ProcessQueueJob.perform_later(class_name: "Product")
  1208 +```
  1209 +
1188 1210 For more tips, check out [Keeping Elasticsearch in Sync](https://www.elastic.co/blog/found-keeping-elasticsearch-in-sync).
1189 1211  
1190 1212 ## Advanced
... ...
lib/searchkick.rb
... ... @@ -5,6 +5,7 @@ require &quot;searchkick/version&quot;
5 5 require "searchkick/index_options"
6 6 require "searchkick/index"
7 7 require "searchkick/indexer"
  8 +require "searchkick/reindex_queue"
8 9 require "searchkick/results"
9 10 require "searchkick/query"
10 11 require "searchkick/model"
... ... @@ -21,6 +22,8 @@ rescue LoadError
21 22 end
22 23 if defined?(ActiveJob)
23 24 require "searchkick/bulk_reindex_job"
  25 + require "searchkick/process_queue_job"
  26 + require "searchkick/process_batch_job"
24 27 require "searchkick/reindex_v2_job"
25 28 end
26 29  
... ...
lib/searchkick/model.rb
... ... @@ -84,11 +84,36 @@ module Searchkick
84 84 after_destroy callback_name, if: proc { self.class.search_callbacks? }
85 85 end
86 86  
87   - def reindex(method_name = nil, refresh: false, async: false)
88   - if async
  87 + def reindex(method_name = nil, refresh: false, async: false, mode: nil)
  88 + klass_options = self.class.searchkick_index.options
  89 +
  90 + if mode.nil?
  91 + mode =
  92 + if async
  93 + :async
  94 + elsif Searchkick.callbacks_value
  95 + Searchkick.callbacks_value
  96 + elsif klass_options.key?(:callbacks) && klass_options[:callbacks] != :async
  97 + # TODO remove 2nd condition in next major version
  98 + klass_options[:callbacks]
  99 + end
  100 + end
  101 +
  102 + case mode
  103 + when :queue
  104 + if method_name
  105 + raise Searchkick::Error, "Partial reindex not supported with queue option"
  106 + else
  107 + Searchkick::ReindexQueue.new(self.class.searchkick_index.name).push(id.to_s)
  108 + end
  109 + when :async
89 110 if method_name
90 111 # TODO support Mongoid and NoBrainer and non-id primary keys
91   - Searchkick::BulkReindexJob.perform_later(class_name: self.class.name, record_ids: [id.to_s], method_name: method_name ? method_name.to_s : nil)
  112 + Searchkick::BulkReindexJob.perform_later(
  113 + class_name: self.class.name,
  114 + record_ids: [id.to_s],
  115 + method_name: method_name ? method_name.to_s : nil
  116 + )
92 117 else
93 118 self.class.searchkick_index.reindex_record_async(self)
94 119 end
... ...
lib/searchkick/process_batch_job.rb 0 โ†’ 100644
... ... @@ -0,0 +1,23 @@
  1 +module Searchkick
  2 + class ProcessBatchJob < ActiveJob::Base
  3 + queue_as :searchkick
  4 +
  5 + def perform(class_name:, record_ids:)
  6 + klass = class_name.constantize
  7 + scope = klass.where(klass.primary_key => record_ids)
  8 + scope = scope.search_import if scope.respond_to?(:search_import)
  9 + records = scope.select(&:should_index?)
  10 +
  11 + # determine which records to delete
  12 + delete_ids = record_ids - records.map { |r| r.id.to_s }
  13 + delete_records = delete_ids.map { |id| m = klass.new; m.id = id; m }
  14 +
  15 + # bulk reindex
  16 + index = klass.searchkick_index
  17 + Searchkick.callbacks(:bulk) do
  18 + index.bulk_index(records)
  19 + index.bulk_delete(delete_records)
  20 + end
  21 + end
  22 + end
  23 +end
... ...
lib/searchkick/process_queue_job.rb 0 โ†’ 100644
... ... @@ -0,0 +1,23 @@
  1 +module Searchkick
  2 + class ProcessQueueJob < ActiveJob::Base
  3 + queue_as :searchkick
  4 +
  5 + def perform(class_name:)
  6 + model = class_name.constantize
  7 +
  8 + limit = 1000
  9 + record_ids = Searchkick::ReindexQueue.new(model.searchkick_index.name).reserve(limit: limit)
  10 + if record_ids.any?
  11 + Searchkick::ProcessBatchJob.perform_later(
  12 + class_name: model.name,
  13 + record_ids: record_ids
  14 + )
  15 + # TODO when moving to reliable queuing, mark as complete
  16 +
  17 + if record_ids.size == limit
  18 + Searchkick::ProcessQueueJob.perform_later(class_name: class_name)
  19 + end
  20 + end
  21 + end
  22 + end
  23 +end
... ...
lib/searchkick/reindex_queue.rb 0 โ†’ 100644
... ... @@ -0,0 +1,40 @@
  1 +module Searchkick
  2 + class ReindexQueue
  3 + attr_reader :name
  4 +
  5 + def initialize(name)
  6 + @name = name
  7 + end
  8 +
  9 + def push(record_id)
  10 + redis.lpush(redis_key, record_id)
  11 + end
  12 +
  13 + # TODO use reliable queuing
  14 + def reserve(limit: 1000)
  15 + record_ids = Set.new
  16 + while record_ids.size < limit && record_id = redis.rpop(redis_key)
  17 + record_ids << record_id
  18 + end
  19 + record_ids.to_a
  20 + end
  21 +
  22 + def clear
  23 + redis.del(redis_key)
  24 + end
  25 +
  26 + def length
  27 + redis.llen(redis_key)
  28 + end
  29 +
  30 + private
  31 +
  32 + def redis
  33 + Searchkick.redis
  34 + end
  35 +
  36 + def redis_key
  37 + "searchkick:reindex_queue:#{name}"
  38 + end
  39 + end
  40 +end
... ...
test/callbacks_test.rb
... ... @@ -24,4 +24,36 @@ class CallbacksTest &lt; Minitest::Test
24 24 Product.searchkick_index.refresh
25 25 assert_search "product", ["Product A", "Product B"]
26 26 end
  27 +
  28 + def test_queue
  29 + skip unless defined?(ActiveJob) && defined?(Redis)
  30 +
  31 + reindex_queue = Searchkick::ReindexQueue.new(Product.searchkick_index.name)
  32 + reindex_queue.clear
  33 +
  34 + Searchkick.callbacks(:queue) do
  35 + store_names ["Product A", "Product B"]
  36 + end
  37 + Product.searchkick_index.refresh
  38 + assert_search "product", [], load: false
  39 + assert_equal 2, reindex_queue.length
  40 +
  41 + Searchkick::ProcessQueueJob.perform_later(class_name: "Product")
  42 + Product.searchkick_index.refresh
  43 + assert_search "product", ["Product A", "Product B"], load: false
  44 + assert_equal 0, reindex_queue.length
  45 +
  46 + Searchkick.callbacks(:queue) do
  47 + Product.where(name: "Product B").destroy_all
  48 + Product.create!(name: "Product C")
  49 + end
  50 + Product.searchkick_index.refresh
  51 + assert_search "product", ["Product A", "Product B"], load: false
  52 + assert_equal 2, reindex_queue.length
  53 +
  54 + Searchkick::ProcessQueueJob.perform_later(class_name: "Product")
  55 + Product.searchkick_index.refresh
  56 + assert_search "product", ["Product A", "Product C"], load: false
  57 + assert_equal 0, reindex_queue.length
  58 + end
27 59 end
... ...
test/test_helper.rb
... ... @@ -14,6 +14,8 @@ File.delete(&quot;elasticsearch.log&quot;) if File.exist?(&quot;elasticsearch.log&quot;)
14 14 Searchkick.client.transport.logger = Logger.new("elasticsearch.log")
15 15 Searchkick.search_timeout = 5
16 16  
  17 +Searchkick.redis = Redis.new if defined?(Redis)
  18 +
17 19 puts "Running against Elasticsearch #{Searchkick.server_version}"
18 20  
19 21 I18n.config.enforce_available_locales = true
... ...