From f075cfd84109b585496d96a484a42052ecf25c8f Mon Sep 17 00:00:00 2001 From: Andrew Kane Date: Sun, 15 Jan 2017 21:12:39 -0800 Subject: [PATCH] Cleaner code --- lib/searchkick/index.rb | 83 ++++++++++++++++++++++++++++++++++++++++++++--------------------------------------- 1 file changed, 44 insertions(+), 39 deletions(-) diff --git a/lib/searchkick/index.rb b/lib/searchkick/index.rb index de9738e..8b7f7f3 100644 --- a/lib/searchkick/index.rb +++ b/lib/searchkick/index.rb @@ -248,8 +248,6 @@ module Searchkick end def import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false) - batch_size = @options[:batch_size] || 1000 - # use scope for import scope = scope.search_import if scope.respond_to?(:search_import) @@ -257,32 +255,7 @@ module Searchkick import_or_update scope.to_a, method_name, async redis.srem(batches_key, batch_id) if batch_id && redis elsif full && async - if scope.respond_to?(:primary_key) - # TODO expire Redis key - primary_key = scope.primary_key - starting_id = scope.minimum(primary_key) || 0 - max_id = scope.maximum(primary_key) || 0 - batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil - - batches_count.times do |i| - batch_id = i + 1 - min_id = starting_id + (i * batch_size) - bulk_reindex_job scope, batch_id, min_id: min_id, max_id: min_id + batch_size - 1 - end - else - batch_id = 1 - items = [] - scope = scope.only(:_id) if scope.respond_to?(:only) - scope.each do |item| - items << item - if items.length == batch_size - bulk_reindex_job scope, batch_id, record_ids: items.map { |i| i.id.to_s } - items = [] - batch_id += 1 - end - end - bulk_reindex_job scope, batch_id, record_ids: items.map { |i| i.id.to_s } - end + full_reindex_async(scope) elsif scope.respond_to?(:find_in_batches) if resume # use total docs instead of max id since there's not a great way @@ -298,18 +271,9 @@ module Searchkick import_or_update batch, method_name, async end else - # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb - # use cursor for Mongoid - items = [] - # TODO add resume - scope.all.each do |item| - items << item - if items.length == batch_size - import_or_update items, method_name, async - items = [] - end + each_batch(scope) do |items| + import_or_update items, method_name, async end - import_or_update items, method_name, async end end @@ -446,6 +410,43 @@ module Searchkick end end + def full_reindex_async(scope) + if scope.respond_to?(:primary_key) + # TODO expire Redis key + primary_key = scope.primary_key + starting_id = scope.minimum(primary_key) || 0 + max_id = scope.maximum(primary_key) || 0 + batches_count = ((max_id - starting_id + 1) / batch_size.to_f).ceil + + batches_count.times do |i| + batch_id = i + 1 + min_id = starting_id + (i * batch_size) + bulk_reindex_job scope, batch_id, min_id: min_id, max_id: min_id + batch_size - 1 + end + else + batch_id = 1 + scope = scope.only(:_id) if scope.respond_to?(:only) + each_batch(scope) do |items| + bulk_reindex_job scope, batch_id, record_ids: items.map { |i| i.id.to_s } + batch_id += 1 + end + end + end + + def each_batch(scope) + # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb + # use cursor for Mongoid + items = [] + scope.all.each do |item| + items << item + if items.length == batch_size + yield items + items = [] + end + end + yield items if items.any? + end + def bulk_reindex_job(scope, batch_id, options) Searchkick::BulkReindexJob.perform_later({ class_name: scope.model_name.name, @@ -455,6 +456,10 @@ module Searchkick redis.sadd(batches_key, batch_id) if redis end + def batch_size + @batch_size ||= @options[:batch_size] || 1000 + end + def with_retries retries = 0 -- libgit2 0.21.0