index.rb 9.92 KB
module Searchkick
  class Index
    include IndexOptions

    attr_reader :name, :options

    def initialize(name, options = {})
      @name = name
      @options = options
    end

    def create(body = {})
      client.indices.create index: name, body: body
    end

    def delete
      client.indices.delete index: name
    end

    def exists?
      client.indices.exists index: name
    end

    def refresh
      client.indices.refresh index: name
    end

    def alias_exists?
      client.indices.exists_alias name: name
    end

    def mapping
      client.indices.get_mapping index: name
    end

    def settings
      client.indices.get_settings index: name
    end

    def swap(new_name)
      old_indices =
        begin
          client.indices.get_alias(name: name).keys
        rescue Elasticsearch::Transport::Transport::Errors::NotFound
          {}
        end
      actions = old_indices.map { |old_name| {remove: {index: old_name, alias: name}} } + [{add: {index: new_name, alias: name}}]
      client.indices.update_aliases body: {actions: actions}
    end

    # record based

    def store(record)
      bulk_index([record])
    end

    def remove(record)
      bulk_delete([record])
    end

    def bulk_delete(records)
      Searchkick.indexer.queue(records.reject { |r| r.id.blank? }.map { |r| {delete: record_data(r)} })
    end

    def bulk_index(records)
      Searchkick.indexer.queue(records.map { |r| {index: record_data(r).merge(data: search_data(r))} })
    end
    alias_method :import, :bulk_index

    def bulk_update(records, method_name)
      Searchkick.indexer.queue(records.map { |r| {update: record_data(r).merge(data: {doc: search_data(r, method_name)})} })
    end

    def record_data(r)
      data = {
        _index: name,
        _id: search_id(r),
        _type: document_type(r)
      }
      data[:_routing] = r.search_routing if r.respond_to?(:search_routing)
      data
    end

    def retrieve(record)
      client.get(
        index: name,
        type: document_type(record),
        id: search_id(record)
      )["_source"]
    end

    def reindex_record(record)
      if record.destroyed? || !record.should_index?
        begin
          remove(record)
        rescue Elasticsearch::Transport::Transport::Errors::NotFound
          # do nothing
        end
      else
        store(record)
      end
    end

    def reindex_record_async(record)
      if Searchkick.callbacks_value.nil?
        if defined?(Searchkick::ReindexV2Job)
          Searchkick::ReindexV2Job.perform_later(record.class.name, record.id.to_s)
        else
          raise Searchkick::Error, "Active Job not found"
        end
      else
        reindex_record(record)
      end
    end

    def similar_record(record, **options)
      like_text = retrieve(record).to_hash
        .keep_if { |k, _| !options[:fields] || options[:fields].map(&:to_s).include?(k) }
        .values.compact.join(" ")

      # TODO deep merge method
      options[:where] ||= {}
      options[:where][:_id] ||= {}
      options[:where][:_id][:not] = record.id.to_s
      options[:per_page] ||= 10
      options[:similar] = true

      # TODO use index class instead of record class
      search_model(record.class, like_text, options)
    end

    # search

    def search_model(searchkick_klass, term = "*", **options, &block)
      query = Searchkick::Query.new(searchkick_klass, term, options)
      yield(query.body) if block
      if options[:execute] == false
        query
      else
        query.execute
      end
    end

    # reindex

    def create_index(index_options: nil)
      index_options ||= self.index_options
      index = Searchkick::Index.new("#{name}_#{Time.now.strftime('%Y%m%d%H%M%S%L')}", @options)
      index.create(index_options)
      index
    end

    def all_indices(unaliased: false)
      indices =
        begin
          client.indices.get_aliases
        rescue Elasticsearch::Transport::Transport::Errors::NotFound
          {}
        end
      indices = indices.select { |_k, v| v.empty? || v["aliases"].empty? } if unaliased
      indices.select { |k, _v| k =~ /\A#{Regexp.escape(name)}_\d{14,17}\z/ }.keys
    end

    # remove old indices that start w/ index_name
    def clean_indices
      indices = all_indices(unaliased: true)
      indices.each do |index|
        Searchkick::Index.new(index).delete
      end
      indices
    end

    def total_docs
      response =
        client.search(
          index: name,
          body: {
            query: {match_all: {}},
            size: 0
          }
        )

      response["hits"]["total"]
    end

    # https://gist.github.com/jarosan/3124884
    # http://www.elasticsearch.org/blog/changing-mapping-with-zero-downtime/
    def reindex_scope(scope, import: true, resume: false)
      if resume
        index_name = all_indices.sort.last
        raise Searchkick::Error, "No index to resume" unless index_name
        index = Searchkick::Index.new(index_name)
      else
        clean_indices

        index = create_index(index_options: scope.searchkick_index_options)
      end

      # check if alias exists
      if alias_exists?
        # import before swap
        index.import_scope(scope, resume: resume) if import

        # get existing indices to remove
        swap(index.name)
        clean_indices
      else
        delete if exists?
        swap(index.name)

        # import after swap
        index.import_scope(scope, resume: resume) if import
      end

      index.refresh

      true
    end

    def import_scope(scope, resume: false, method_name: nil)
      batch_size = @options[:batch_size] || 1000

      # use scope for import
      scope = scope.search_import if scope.respond_to?(:search_import)
      if scope.respond_to?(:find_in_batches)
        if resume
          # use total docs instead of max id since there's not a great way
          # to get the max _id without scripting since it's a string

          # TODO use primary key and prefix with table name
          scope = scope.where("id > ?", total_docs)
        end

        pool =
          if options[:threads]
            require "thread/pool"
            Thread.pool(options[:threads])
          end

        scope.find_in_batches batch_size: batch_size do |batch|
          process_batch(pool) do
            # puts "Boom"
            # sleep(1)
            puts Thread.current.object_id
            import_or_update batch.select(&:should_index?), method_name
          end
        end

        pool.shutdown if pool
      else
        # https://github.com/karmi/tire/blob/master/lib/tire/model/import.rb
        # use cursor for Mongoid
        items = []
        # TODO add resume
        scope.all.each do |item|
          items << item if item.should_index?
          if items.length == batch_size
            import_or_update items, method_name
            items = []
          end
        end
        import_or_update items, method_name
      end
    end

    def import_or_update(records, method_name)
      retries = 0
      begin
        method_name ? bulk_update(records, method_name) : import(records)
      rescue Faraday::ClientError => e
        if retries < 1
          retries += 1
          retry
        end
        raise e
      end
    end

    # other

    def tokens(text, options = {})
      client.indices.analyze({text: text, index: name}.merge(options))["tokens"].map { |t| t["token"] }
    end

    def klass_document_type(klass)
      if klass.respond_to?(:document_type)
        klass.document_type
      else
        klass.model_name.to_s.underscore
      end
    end

    protected

    def client
      Searchkick.client
    end

    def document_type(record)
      if record.respond_to?(:search_document_type)
        record.search_document_type
      else
        klass_document_type(record.class)
      end
    end

    def search_id(record)
      id = record.respond_to?(:search_document_id) ? record.search_document_id : record.id
      id.is_a?(Numeric) ? id : id.to_s
    end

    def search_data(record, method_name = nil)
      partial_reindex = !method_name.nil?
      source = record.send(method_name || :search_data)
      options = record.class.searchkick_options

      # stringify fields
      # remove _id since search_id is used instead
      source = source.each_with_object({}) { |(k, v), memo| memo[k.to_s] = v; memo }.except("_id", "_type")

      # conversions
      Array(options[:conversions]).map(&:to_s).each do |conversions_field|
        if source[conversions_field]
          source[conversions_field] = source[conversions_field].map { |k, v| {query: k, count: v} }
        end
      end

      # hack to prevent generator field doesn't exist error
      (options[:suggest] || []).map(&:to_s).each do |field|
        source[field] = nil if !source[field] && !partial_reindex
      end

      # locations
      (options[:locations] || []).map(&:to_s).each do |field|
        if source[field]
          if !source[field].is_a?(Hash) && (source[field].first.is_a?(Array) || source[field].first.is_a?(Hash))
            # multiple locations
            source[field] = source[field].map { |a| location_value(a) }
          else
            source[field] = location_value(source[field])
          end
        end
      end

      cast_big_decimal(source)

      source.as_json
    end

    def location_value(value)
      if value.is_a?(Array)
        value.map(&:to_f).reverse
      elsif value.is_a?(Hash)
        {lat: value[:lat].to_f, lon: value[:lon].to_f}
      else
        value
      end
    end

    # change all BigDecimal values to floats due to
    # https://github.com/rails/rails/issues/6033
    # possible loss of precision :/
    def cast_big_decimal(obj)
      case obj
      when BigDecimal
        obj.to_f
      when Hash
        obj.each do |k, v|
          obj[k] = cast_big_decimal(v)
        end
      when Enumerable
        obj.map do |v|
          cast_big_decimal(v)
        end
      else
        obj
      end
    end

    def process_batch(pool)
      if pool
        pool.process { yield }
      else
        yield
      end
    end
  end
end