relation_indexer.rb 4.42 KB
module Searchkick
  class RelationIndexer
    attr_reader :index

    def initialize(index)
      @index = index
    end

    def reindex(relation, mode:, method_name: nil, full: false, resume: false, scope: nil)
      # apply scopes
      if scope
        relation = relation.send(scope)
      elsif relation.respond_to?(:search_import)
        relation = relation.search_import
      end

      # remove unneeded loading for async
      if mode == :async
        if relation.respond_to?(:primary_key)
          relation = relation.select(relation.primary_key).except(:includes, :preload)
        elsif relation.respond_to?(:only)
          relation = relation.only(:_id)
        end
      end

      if mode == :async && full
        return full_reindex_async(relation)
      end

      relation = resume_relation(relation) if resume

      reindex_options = {
        mode: mode,
        method_name: method_name,
        full: full
      }
      record_indexer = RecordIndexer.new(index)

      in_batches(relation) do |items|
        record_indexer.reindex(items, **reindex_options)
      end
    end

    def batches_left
      Searchkick.with_redis { |r| r.scard(batches_key) }
    end

    def batch_completed(batch_id)
      Searchkick.with_redis { |r| r.srem(batches_key, batch_id) }
    end

    private

    def resume_relation(relation)
      if relation.respond_to?(:primary_key)
        # use total docs instead of max id since there's not a great way
        # to get the max _id without scripting since it's a string
        where = relation.arel_table[relation.primary_key].gt(index.total_docs)
        relation = relation.where(where)
      else
        raise Error, "Resume not supported for Mongoid"
      end
    end

    def in_batches(relation)
      if relation.respond_to?(:find_in_batches)
        klass = relation.klass
        # remove order to prevent possible warnings
        relation.except(:order).find_in_batches(batch_size: batch_size) do |batch|
          # prevent scope from affecting search_data as well as inline jobs
          # Active Record runs relation calls in scoping block
          # https://github.com/rails/rails/blob/main/activerecord/lib/active_record/relation/delegation.rb
          # note: we could probably just call klass.current_scope = nil
          # anywhere in reindex method (after initial all call),
          # but this is more cautious
          previous_scope = klass.current_scope(true)
          if previous_scope
            begin
              klass.current_scope = nil
              yield batch
            ensure
              klass.current_scope = previous_scope
            end
          else
            yield batch
          end
        end
      else
        klass = relation.klass
        each_batch(relation, batch_size: batch_size) do |batch|
          # prevent scope from affecting search_data as well as inline jobs
          # note: Model.with_scope doesn't always restore scope, so use custom logic
          previous_scope = Mongoid::Threaded.current_scope(klass)
          if previous_scope
            begin
              Mongoid::Threaded.set_current_scope(nil, klass)
              yield batch
            ensure
              Mongoid::Threaded.set_current_scope(previous_scope, klass)
            end
          else
            yield batch
          end
        end
      end
    end

    def each_batch(relation, batch_size:)
      # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb
      # use cursor for Mongoid
      items = []
      relation.all.each do |item|
        items << item
        if items.length == batch_size
          yield items
          items = []
        end
      end
      yield items if items.any?
    end

    def batch_size
      @batch_size ||= index.options[:batch_size] || 1000
    end

    def full_reindex_async(relation)
      batch_id = 1
      class_name = relation.searchkick_options[:class_name]

      in_batches(relation) do |items|
        batch_job(class_name, batch_id, items.map(&:id))
        batch_id += 1
      end
    end

    def batch_job(class_name, batch_id, record_ids)
      Searchkick.with_redis { |r| r.sadd(batches_key, batch_id) }
      Searchkick::BulkReindexJob.perform_later(
        class_name: class_name,
        index_name: index.name,
        batch_id: batch_id,
        record_ids: record_ids.map { |v| v.instance_of?(Integer) ? v : v.to_s }
      )
    end

    def batches_key
      "searchkick:reindex:#{index.name}:batches"
    end
  end
end