Commit b294478d5bd925e628b6cedd279c808768ad9a89

Authored by Andrew Kane
1 parent 08feab18
Exists in index_name

Added index_name to jobs

lib/searchkick/index.rb
@@ -259,6 +259,10 @@ module Searchkick @@ -259,6 +259,10 @@ module Searchkick
259 @bulk_indexer ||= BulkIndexer.new(self) 259 @bulk_indexer ||= BulkIndexer.new(self)
260 end 260 end
261 261
  262 + def import_before_promotion(index, relation, **import_options)
  263 + index.import_scope(relation, **import_options)
  264 + end
  265 +
262 # https://gist.github.com/jarosan/3124884 266 # https://gist.github.com/jarosan/3124884
263 # http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/ 267 # http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/
264 def reindex_scope(relation, import: true, resume: false, retain: false, async: false, refresh_interval: nil, scope: nil) 268 def reindex_scope(relation, import: true, resume: false, retain: false, async: false, refresh_interval: nil, scope: nil)
@@ -284,8 +288,7 @@ module Searchkick @@ -284,8 +288,7 @@ module Searchkick
284 # check if alias exists 288 # check if alias exists
285 alias_exists = alias_exists? 289 alias_exists = alias_exists?
286 if alias_exists 290 if alias_exists
287 - # import before promotion  
288 - index.import_scope(relation, **import_options) if import 291 + import_before_promotion(index, relation, **import_options) if import
289 292
290 # get existing indices to remove 293 # get existing indices to remove
291 unless async 294 unless async
lib/searchkick/model.rb
@@ -15,6 +15,7 @@ module Searchkick @@ -15,6 +15,7 @@ module Searchkick
15 Searchkick.models << self 15 Searchkick.models << self
16 16
17 options[:_type] ||= -> { searchkick_index.klass_document_type(self, true) } 17 options[:_type] ||= -> { searchkick_index.klass_document_type(self, true) }
  18 + options[:class_name] = name
18 19
19 callbacks = options.key?(:callbacks) ? options[:callbacks] : :inline 20 callbacks = options.key?(:callbacks) ? options[:callbacks] : :inline
20 unless [:inline, true, false, :async, :queue].include?(callbacks) 21 unless [:inline, true, false, :async, :queue].include?(callbacks)
@@ -44,8 +45,8 @@ module Searchkick @@ -44,8 +45,8 @@ module Searchkick
44 end 45 end
45 alias_method Searchkick.search_method_name, :searchkick_search if Searchkick.search_method_name 46 alias_method Searchkick.search_method_name, :searchkick_search if Searchkick.search_method_name
46 47
47 - def searchkick_index  
48 - index = class_variable_get(:@@searchkick_index) 48 + def searchkick_index(name: nil)
  49 + index = name || class_variable_get(:@@searchkick_index)
49 index = index.call if index.respond_to?(:call) 50 index = index.call if index.respond_to?(:call)
50 index_cache = class_variable_get(:@@searchkick_index_cache) 51 index_cache = class_variable_get(:@@searchkick_index_cache)
51 index_cache[index] ||= Searchkick::Index.new(index, searchkick_options) 52 index_cache[index] ||= Searchkick::Index.new(index, searchkick_options)
lib/searchkick/process_batch_job.rb
@@ -2,7 +2,7 @@ module Searchkick @@ -2,7 +2,7 @@ module Searchkick
2 class ProcessBatchJob < ActiveJob::Base 2 class ProcessBatchJob < ActiveJob::Base
3 queue_as { Searchkick.queue_name } 3 queue_as { Searchkick.queue_name }
4 4
5 - def perform(class_name:, record_ids:) 5 + def perform(class_name:, record_ids:, index_name: nil)
6 # separate routing from id 6 # separate routing from id
7 routing = Hash[record_ids.map { |r| r.split(/(?<!\|)\|(?!\|)/, 2).map { |v| v.gsub("||", "|") } }] 7 routing = Hash[record_ids.map { |r| r.split(/(?<!\|)\|(?!\|)/, 2).map { |v| v.gsub("||", "|") } }]
8 record_ids = routing.keys 8 record_ids = routing.keys
@@ -26,7 +26,7 @@ module Searchkick @@ -26,7 +26,7 @@ module Searchkick
26 end 26 end
27 27
28 # bulk reindex 28 # bulk reindex
29 - index = klass.searchkick_index 29 + index = klass.searchkick_index(name: index_name)
30 Searchkick.callbacks(:bulk) do 30 Searchkick.callbacks(:bulk) do
31 index.bulk_index(records) if records.any? 31 index.bulk_index(records) if records.any?
32 index.bulk_delete(delete_records) if delete_records.any? 32 index.bulk_delete(delete_records) if delete_records.any?
lib/searchkick/process_queue_job.rb
@@ -2,16 +2,19 @@ module Searchkick @@ -2,16 +2,19 @@ module Searchkick
2 class ProcessQueueJob < ActiveJob::Base 2 class ProcessQueueJob < ActiveJob::Base
3 queue_as { Searchkick.queue_name } 3 queue_as { Searchkick.queue_name }
4 4
5 - def perform(class_name:) 5 + def perform(class_name:, index_name: nil, inline: false)
6 model = class_name.constantize 6 model = class_name.constantize
7 limit = model.searchkick_index.options[:batch_size] || 1000 7 limit = model.searchkick_index.options[:batch_size] || 1000
8 8
9 loop do 9 loop do
10 - record_ids = model.searchkick_index.reindex_queue.reserve(limit: limit) 10 + record_ids = model.searchkick_index(name: index_name).reindex_queue.reserve(limit: limit)
11 if record_ids.any? 11 if record_ids.any?
12 - Searchkick::ProcessBatchJob.perform_later(  
13 - class_name: model.name,  
14 - record_ids: record_ids 12 + perform_method = inline ? :perform_now : :perform_later
  13 + Searchkick::ProcessBatchJob.send(
  14 + perform_method,
  15 + class_name: class_name,
  16 + record_ids: record_ids,
  17 + index_name: index_name
15 ) 18 )
16 # TODO when moving to reliable queuing, mark as complete 19 # TODO when moving to reliable queuing, mark as complete
17 end 20 end