diff --git a/README.md b/README.md index cf07b92..4971e2b 100644 --- a/README.md +++ b/README.md @@ -406,6 +406,22 @@ There are three strategies for keeping the index synced with your database. end ``` +For better performance, you can also do bulk updates. [master] + +```ruby +Searchkick.callbacks(:bulk) do + User.find_each(&:update_fields) +end +``` + +Or temporarily skip updates. [master] + +```ruby +Searchkick.callbacks(false) do + User.find_each(&:update_fields) +end +``` + #### Associations Data is **not** automatically synced when an association is updated. If this is desired, add a callback to reindex: @@ -1129,15 +1145,6 @@ class Product < ActiveRecord::Base end ``` -Turn off callbacks temporarily - -```ruby -Product.disable_search_callbacks # or use Searchkick.disable_callbacks for all models -ExpensiveProductsTask.execute -Product.enable_search_callbacks # or use Searchkick.enable_callbacks for all models -Product.reindex -``` - Change timeout ```ruby diff --git a/lib/searchkick.rb b/lib/searchkick.rb index 24accb1..3813109 100644 --- a/lib/searchkick.rb +++ b/lib/searchkick.rb @@ -55,17 +55,69 @@ module Searchkick end def self.enable_callbacks - Thread.current[:searchkick_callbacks_enabled] = true + self.callbacks_value = nil end def self.disable_callbacks - Thread.current[:searchkick_callbacks_enabled] = false + self.callbacks_value = false end def self.callbacks? Thread.current[:searchkick_callbacks_enabled].nil? || Thread.current[:searchkick_callbacks_enabled] end + def self.callbacks(value) + if block_given? + previous_value = callbacks_value + begin + self.callbacks_value = value + yield + perform_bulk if callbacks_value == :bulk + ensure + self.callbacks_value = previous_value + end + else + self.callbacks_value = value + end + end + + def self.queue_items(items) + queued_items.concat(items) + perform_bulk unless callbacks_value == :bulk + end + + def self.perform_bulk + items = queued_items + clear_queued_items + perform_items(items) + end + + def self.perform_items(items) + if items.any? + response = client.bulk(body: items) + if response["errors"] + first_item = response["items"].first + raise Searchkick::ImportError, (first_item["index"] || first_item["delete"])["error"] + end + end + end + + def self.queued_items + Thread.current[:searchkick_queued_items] ||= [] + end + + def self.clear_queued_items + Thread.current[:searchkick_queued_items] = [] + end + + def self.callbacks_value + Thread.current[:searchkick_callbacks_enabled] + end + + def self.callbacks_value=(value) + Thread.current[:searchkick_callbacks_enabled] = value + end + def self.env @env ||= ENV["RAILS_ENV"] || ENV["RACK_ENV"] || "development" end diff --git a/lib/searchkick/index.rb b/lib/searchkick/index.rb index 795d306..7126011 100644 --- a/lib/searchkick/index.rb +++ b/lib/searchkick/index.rb @@ -45,38 +45,21 @@ module Searchkick # record based def store(record) - client.index( - index: name, - type: document_type(record), - id: search_id(record), - body: search_data(record) - ) + bulk_index([record]) end def remove(record) - id = search_id(record) - unless id.blank? - client.delete( - index: name, - type: document_type(record), - id: id - ) - end + bulk_delete([record]) end - def import(records) - records.group_by { |r| document_type(r) }.each do |type, batch| - response = - client.bulk( - index: name, - type: type, - body: batch.map { |r| {index: {_id: search_id(r), data: search_data(r)}} } - ) - if response["errors"] - raise Searchkick::ImportError, response["items"].first["index"]["error"] - end - end + def bulk_delete(records) + Searchkick.queue_items(records.reject { |r| r.id.blank? }.map { |r| {delete: {_index: name, _type: document_type(r), _id: search_id(r)}} }) + end + + def bulk_index(records) + Searchkick.queue_items(records.map { |r| {index: {_index: name, _type: document_type(r), _id: search_id(r), data: search_data(r)}} }) end + alias_method :import, :bulk_index def retrieve(record) client.get( @@ -99,10 +82,14 @@ module Searchkick end def reindex_record_async(record) - if defined?(Searchkick::ReindexV2Job) - Searchkick::ReindexV2Job.perform_later(record.class.name, record.id.to_s) + if Searchkick.callbacks_value.nil? + if defined?(Searchkick::ReindexV2Job) + Searchkick::ReindexV2Job.perform_later(record.class.name, record.id.to_s) + else + Delayed::Job.enqueue Searchkick::ReindexJob.new(record.class.name, record.id.to_s) + end else - Delayed::Job.enqueue Searchkick::ReindexJob.new(record.class.name, record.id.to_s) + reindex_record(record) end end diff --git a/test/callbacks_test.rb b/test/callbacks_test.rb new file mode 100644 index 0000000..60eeb8f --- /dev/null +++ b/test/callbacks_test.rb @@ -0,0 +1,27 @@ +require_relative "test_helper" + +class CallbacksTest < Minitest::Test + def test_true_create + Searchkick.callbacks(true) do + store_names ["Product A", "Product B"] + end + Product.searchkick_index.refresh + assert_search "product", ["Product A", "Product B"] + end + + def test_false_create + Searchkick.callbacks(false) do + store_names ["Product A", "Product B"] + end + Product.searchkick_index.refresh + assert_search "product", [] + end + + def test_bulk_create + Searchkick.callbacks(:bulk) do + store_names ["Product A", "Product B"] + end + Product.searchkick_index.refresh + assert_search "product", ["Product A", "Product B"] + end +end -- libgit2 0.21.0