diff --git a/.travis.yml b/.travis.yml index 28c077b..715ec06 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,7 @@ language: ruby rvm: 2.3.1 services: - mongodb + - redis-server before_install: - ./test/ci/before_install.sh script: RUBYOPT=W0 bundle exec rake test diff --git a/CHANGELOG.md b/CHANGELOG.md index 27e69db..3be5c1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 2.0.4 [unreleased] + +- Added support for queuing updates [experimental] + ## 2.0.3 - Added `async` option to `reindex` [experimental] diff --git a/Gemfile b/Gemfile index 113bcdb..7b0e8f4 100644 --- a/Gemfile +++ b/Gemfile @@ -8,3 +8,4 @@ gem "activerecord", "~> 5.0.0" gem "gemoji-parser" gem "typhoeus" gem "activejob" +gem "redis" diff --git a/README.md b/README.md index 3bbf79c..c5dfd5e 100644 --- a/README.md +++ b/README.md @@ -1185,6 +1185,28 @@ And use: Searchkick.reindex_status(index_name) ``` +### Queues [master, experimental, ActiveRecord only] + +You can also queue updates and do them in bulk for better performance. First, set up Redis in an initializer. + +```ruby +Searchkick.redis = Redis.new +``` + +And ask your models to queue updates. + +```ruby +class Product < ActiveRecord::Base + searchkick callbacks: :queue +end +``` + +Then, set up a background job to run. + +```ruby +Searchkick::ProcessQueueJob.perform_later(class_name: "Product") +``` + For more tips, check out [Keeping Elasticsearch in Sync](https://www.elastic.co/blog/found-keeping-elasticsearch-in-sync). ## Advanced diff --git a/lib/searchkick.rb b/lib/searchkick.rb index 03be9fb..212e441 100644 --- a/lib/searchkick.rb +++ b/lib/searchkick.rb @@ -5,6 +5,7 @@ require "searchkick/version" require "searchkick/index_options" require "searchkick/index" require "searchkick/indexer" +require "searchkick/reindex_queue" require "searchkick/results" require "searchkick/query" require "searchkick/model" @@ -21,6 +22,8 @@ rescue LoadError end if defined?(ActiveJob) require "searchkick/bulk_reindex_job" + require "searchkick/process_queue_job" + require "searchkick/process_batch_job" require "searchkick/reindex_v2_job" end diff --git a/lib/searchkick/model.rb b/lib/searchkick/model.rb index 88dddcf..dac21cb 100644 --- a/lib/searchkick/model.rb +++ b/lib/searchkick/model.rb @@ -84,11 +84,36 @@ module Searchkick after_destroy callback_name, if: proc { self.class.search_callbacks? } end - def reindex(method_name = nil, refresh: false, async: false) - if async + def reindex(method_name = nil, refresh: false, async: false, mode: nil) + klass_options = self.class.searchkick_index.options + + if mode.nil? + mode = + if async + :async + elsif Searchkick.callbacks_value + Searchkick.callbacks_value + elsif klass_options.key?(:callbacks) && klass_options[:callbacks] != :async + # TODO remove 2nd condition in next major version + klass_options[:callbacks] + end + end + + case mode + when :queue + if method_name + raise Searchkick::Error, "Partial reindex not supported with queue option" + else + Searchkick::ReindexQueue.new(self.class.searchkick_index.name).push(id.to_s) + end + when :async if method_name # TODO support Mongoid and NoBrainer and non-id primary keys - Searchkick::BulkReindexJob.perform_later(class_name: self.class.name, record_ids: [id.to_s], method_name: method_name ? method_name.to_s : nil) + Searchkick::BulkReindexJob.perform_later( + class_name: self.class.name, + record_ids: [id.to_s], + method_name: method_name ? method_name.to_s : nil + ) else self.class.searchkick_index.reindex_record_async(self) end diff --git a/lib/searchkick/process_batch_job.rb b/lib/searchkick/process_batch_job.rb new file mode 100644 index 0000000..44cb1f3 --- /dev/null +++ b/lib/searchkick/process_batch_job.rb @@ -0,0 +1,23 @@ +module Searchkick + class ProcessBatchJob < ActiveJob::Base + queue_as :searchkick + + def perform(class_name:, record_ids:) + klass = class_name.constantize + scope = klass.where(klass.primary_key => record_ids) + scope = scope.search_import if scope.respond_to?(:search_import) + records = scope.select(&:should_index?) + + # determine which records to delete + delete_ids = record_ids - records.map { |r| r.id.to_s } + delete_records = delete_ids.map { |id| m = klass.new; m.id = id; m } + + # bulk reindex + index = klass.searchkick_index + Searchkick.callbacks(:bulk) do + index.bulk_index(records) + index.bulk_delete(delete_records) + end + end + end +end diff --git a/lib/searchkick/process_queue_job.rb b/lib/searchkick/process_queue_job.rb new file mode 100644 index 0000000..fd6e688 --- /dev/null +++ b/lib/searchkick/process_queue_job.rb @@ -0,0 +1,23 @@ +module Searchkick + class ProcessQueueJob < ActiveJob::Base + queue_as :searchkick + + def perform(class_name:) + model = class_name.constantize + + limit = 1000 + record_ids = Searchkick::ReindexQueue.new(model.searchkick_index.name).reserve(limit: limit) + if record_ids.any? + Searchkick::ProcessBatchJob.perform_later( + class_name: model.name, + record_ids: record_ids + ) + # TODO when moving to reliable queuing, mark as complete + + if record_ids.size == limit + Searchkick::ProcessQueueJob.perform_later(class_name: class_name) + end + end + end + end +end diff --git a/lib/searchkick/reindex_queue.rb b/lib/searchkick/reindex_queue.rb new file mode 100644 index 0000000..36a4aeb --- /dev/null +++ b/lib/searchkick/reindex_queue.rb @@ -0,0 +1,40 @@ +module Searchkick + class ReindexQueue + attr_reader :name + + def initialize(name) + @name = name + end + + def push(record_id) + redis.lpush(redis_key, record_id) + end + + # TODO use reliable queuing + def reserve(limit: 1000) + record_ids = Set.new + while record_ids.size < limit && record_id = redis.rpop(redis_key) + record_ids << record_id + end + record_ids.to_a + end + + def clear + redis.del(redis_key) + end + + def length + redis.llen(redis_key) + end + + private + + def redis + Searchkick.redis + end + + def redis_key + "searchkick:reindex_queue:#{name}" + end + end +end diff --git a/test/callbacks_test.rb b/test/callbacks_test.rb index 60eeb8f..8a5d834 100644 --- a/test/callbacks_test.rb +++ b/test/callbacks_test.rb @@ -24,4 +24,36 @@ class CallbacksTest < Minitest::Test Product.searchkick_index.refresh assert_search "product", ["Product A", "Product B"] end + + def test_queue + skip unless defined?(ActiveJob) && defined?(Redis) + + reindex_queue = Searchkick::ReindexQueue.new(Product.searchkick_index.name) + reindex_queue.clear + + Searchkick.callbacks(:queue) do + store_names ["Product A", "Product B"] + end + Product.searchkick_index.refresh + assert_search "product", [], load: false + assert_equal 2, reindex_queue.length + + Searchkick::ProcessQueueJob.perform_later(class_name: "Product") + Product.searchkick_index.refresh + assert_search "product", ["Product A", "Product B"], load: false + assert_equal 0, reindex_queue.length + + Searchkick.callbacks(:queue) do + Product.where(name: "Product B").destroy_all + Product.create!(name: "Product C") + end + Product.searchkick_index.refresh + assert_search "product", ["Product A", "Product B"], load: false + assert_equal 2, reindex_queue.length + + Searchkick::ProcessQueueJob.perform_later(class_name: "Product") + Product.searchkick_index.refresh + assert_search "product", ["Product A", "Product C"], load: false + assert_equal 0, reindex_queue.length + end end diff --git a/test/test_helper.rb b/test/test_helper.rb index 81b239b..06fbce7 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -14,6 +14,8 @@ File.delete("elasticsearch.log") if File.exist?("elasticsearch.log") Searchkick.client.transport.logger = Logger.new("elasticsearch.log") Searchkick.search_timeout = 5 +Searchkick.redis = Redis.new if defined?(Redis) + puts "Running against Elasticsearch #{Searchkick.server_version}" I18n.config.enforce_available_locales = true -- libgit2 0.21.0