Commit 0614e889419254036bf15e2cb180caecedb75a0e

Authored by Andrew Kane
1 parent c24d3ac7

Added bulk updates

README.md
... ... @@ -406,6 +406,22 @@ There are three strategies for keeping the index synced with your database.
406 406 end
407 407 ```
408 408  
  409 +For better performance, you can also do bulk updates. [master]
  410 +
  411 +```ruby
  412 +Searchkick.callbacks(:bulk) do
  413 + User.find_each(&:update_fields)
  414 +end
  415 +```
  416 +
  417 +Or temporarily skip updates. [master]
  418 +
  419 +```ruby
  420 +Searchkick.callbacks(false) do
  421 + User.find_each(&:update_fields)
  422 +end
  423 +```
  424 +
409 425 #### Associations
410 426  
411 427 Data is **not** automatically synced when an association is updated. If this is desired, add a callback to reindex:
... ... @@ -1129,15 +1145,6 @@ class Product < ActiveRecord::Base
1129 1145 end
1130 1146 ```
1131 1147  
1132   -Turn off callbacks temporarily
1133   -
1134   -```ruby
1135   -Product.disable_search_callbacks # or use Searchkick.disable_callbacks for all models
1136   -ExpensiveProductsTask.execute
1137   -Product.enable_search_callbacks # or use Searchkick.enable_callbacks for all models
1138   -Product.reindex
1139   -```
1140   -
1141 1148 Change timeout
1142 1149  
1143 1150 ```ruby
... ...
lib/searchkick.rb
... ... @@ -55,17 +55,69 @@ module Searchkick
55 55 end
56 56  
57 57 def self.enable_callbacks
58   - Thread.current[:searchkick_callbacks_enabled] = true
  58 + self.callbacks_value = nil
59 59 end
60 60  
61 61 def self.disable_callbacks
62   - Thread.current[:searchkick_callbacks_enabled] = false
  62 + self.callbacks_value = false
63 63 end
64 64  
65 65 def self.callbacks?
66 66 Thread.current[:searchkick_callbacks_enabled].nil? || Thread.current[:searchkick_callbacks_enabled]
67 67 end
68 68  
  69 + def self.callbacks(value)
  70 + if block_given?
  71 + previous_value = callbacks_value
  72 + begin
  73 + self.callbacks_value = value
  74 + yield
  75 + perform_bulk if callbacks_value == :bulk
  76 + ensure
  77 + self.callbacks_value = previous_value
  78 + end
  79 + else
  80 + self.callbacks_value = value
  81 + end
  82 + end
  83 +
  84 + def self.queue_items(items)
  85 + queued_items.concat(items)
  86 + perform_bulk unless callbacks_value == :bulk
  87 + end
  88 +
  89 + def self.perform_bulk
  90 + items = queued_items
  91 + clear_queued_items
  92 + perform_items(items)
  93 + end
  94 +
  95 + def self.perform_items(items)
  96 + if items.any?
  97 + response = client.bulk(body: items)
  98 + if response["errors"]
  99 + first_item = response["items"].first
  100 + raise Searchkick::ImportError, (first_item["index"] || first_item["delete"])["error"]
  101 + end
  102 + end
  103 + end
  104 +
  105 + def self.queued_items
  106 + Thread.current[:searchkick_queued_items] ||= []
  107 + end
  108 +
  109 + def self.clear_queued_items
  110 + Thread.current[:searchkick_queued_items] = []
  111 + end
  112 +
  113 + def self.callbacks_value
  114 + Thread.current[:searchkick_callbacks_enabled]
  115 + end
  116 +
  117 + def self.callbacks_value=(value)
  118 + Thread.current[:searchkick_callbacks_enabled] = value
  119 + end
  120 +
69 121 def self.env
70 122 @env ||= ENV["RAILS_ENV"] || ENV["RACK_ENV"] || "development"
71 123 end
... ...
lib/searchkick/index.rb
... ... @@ -45,38 +45,21 @@ module Searchkick
45 45 # record based
46 46  
47 47 def store(record)
48   - client.index(
49   - index: name,
50   - type: document_type(record),
51   - id: search_id(record),
52   - body: search_data(record)
53   - )
  48 + bulk_index([record])
54 49 end
55 50  
56 51 def remove(record)
57   - id = search_id(record)
58   - unless id.blank?
59   - client.delete(
60   - index: name,
61   - type: document_type(record),
62   - id: id
63   - )
64   - end
  52 + bulk_delete([record])
65 53 end
66 54  
67   - def import(records)
68   - records.group_by { |r| document_type(r) }.each do |type, batch|
69   - response =
70   - client.bulk(
71   - index: name,
72   - type: type,
73   - body: batch.map { |r| {index: {_id: search_id(r), data: search_data(r)}} }
74   - )
75   - if response["errors"]
76   - raise Searchkick::ImportError, response["items"].first["index"]["error"]
77   - end
78   - end
  55 + def bulk_delete(records)
  56 + Searchkick.queue_items(records.reject { |r| r.id.blank? }.map { |r| {delete: {_index: name, _type: document_type(r), _id: search_id(r)}} })
  57 + end
  58 +
  59 + def bulk_index(records)
  60 + Searchkick.queue_items(records.map { |r| {index: {_index: name, _type: document_type(r), _id: search_id(r), data: search_data(r)}} })
79 61 end
  62 + alias_method :import, :bulk_index
80 63  
81 64 def retrieve(record)
82 65 client.get(
... ... @@ -99,10 +82,14 @@ module Searchkick
99 82 end
100 83  
101 84 def reindex_record_async(record)
102   - if defined?(Searchkick::ReindexV2Job)
103   - Searchkick::ReindexV2Job.perform_later(record.class.name, record.id.to_s)
  85 + if Searchkick.callbacks_value.nil?
  86 + if defined?(Searchkick::ReindexV2Job)
  87 + Searchkick::ReindexV2Job.perform_later(record.class.name, record.id.to_s)
  88 + else
  89 + Delayed::Job.enqueue Searchkick::ReindexJob.new(record.class.name, record.id.to_s)
  90 + end
104 91 else
105   - Delayed::Job.enqueue Searchkick::ReindexJob.new(record.class.name, record.id.to_s)
  92 + reindex_record(record)
106 93 end
107 94 end
108 95  
... ...
test/callbacks_test.rb 0 → 100644
... ... @@ -0,0 +1,27 @@
  1 +require_relative "test_helper"
  2 +
  3 +class CallbacksTest < Minitest::Test
  4 + def test_true_create
  5 + Searchkick.callbacks(true) do
  6 + store_names ["Product A", "Product B"]
  7 + end
  8 + Product.searchkick_index.refresh
  9 + assert_search "product", ["Product A", "Product B"]
  10 + end
  11 +
  12 + def test_false_create
  13 + Searchkick.callbacks(false) do
  14 + store_names ["Product A", "Product B"]
  15 + end
  16 + Product.searchkick_index.refresh
  17 + assert_search "product", []
  18 + end
  19 +
  20 + def test_bulk_create
  21 + Searchkick.callbacks(:bulk) do
  22 + store_names ["Product A", "Product B"]
  23 + end
  24 + Product.searchkick_index.refresh
  25 + assert_search "product", ["Product A", "Product B"]
  26 + end
  27 +end
... ...