From 7b06dbe4cdec37ff1349ccf01bf8cb3295971007 Mon Sep 17 00:00:00 2001 From: Andrew Kane Date: Tue, 17 Jan 2017 22:25:33 -0800 Subject: [PATCH] Added support for connection_pool --- CHANGELOG.md | 1 + Gemfile | 1 + lib/searchkick.rb | 12 ++++++++++++ lib/searchkick/index.rb | 10 +++------- lib/searchkick/reindex_queue.rb | 14 +++++--------- test/test_helper.rb | 8 +++++++- 6 files changed, 29 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d8d326..518ab6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 2.1.1 [unreleased] - Fixed duplicate notifications +- Added support for `connection_pool` ## 2.1.0 diff --git a/Gemfile b/Gemfile index 7b0e8f4..adaa4b3 100644 --- a/Gemfile +++ b/Gemfile @@ -9,3 +9,4 @@ gem "gemoji-parser" gem "typhoeus" gem "activejob" gem "redis" +gem "connection_pool" diff --git a/lib/searchkick.rb b/lib/searchkick.rb index aefecb2..c378d86 100644 --- a/lib/searchkick.rb +++ b/lib/searchkick.rb @@ -145,6 +145,18 @@ module Searchkick end end + def self.with_redis + if redis + if redis.respond_to?(:with) + redis.with do |r| + yield r + end + else + yield redis + end + end + end + # private def self.load_records(records, ids) records = diff --git a/lib/searchkick/index.rb b/lib/searchkick/index.rb index 9eeddf5..47675c3 100644 --- a/lib/searchkick/index.rb +++ b/lib/searchkick/index.rb @@ -258,7 +258,7 @@ module Searchkick if batch import_or_update scope.to_a, method_name, async - redis.srem(batches_key, batch_id) if batch_id && redis + Searchkick.with_redis { |r| r.srem(batches_key, batch_id) } if batch_id elsif full && async full_reindex_async(scope) elsif scope.respond_to?(:find_in_batches) @@ -283,7 +283,7 @@ module Searchkick end def batches_left - redis.scard(batches_key) if redis + Searchkick.with_redis { |r| r.scard(batches_key) } end # other @@ -459,7 +459,7 @@ module Searchkick index_name: name, batch_id: batch_id }.merge(options)) - redis.sadd(batches_key, batch_id) if redis + Searchkick.with_redis { |r| r.sadd(batches_key, batch_id) } end def batch_size @@ -492,10 +492,6 @@ module Searchkick Searchkick.indexer.queue(records.map { |r| {update: record_data(r).merge(data: {doc: search_data(r, method_name)})} }) end - def redis - Searchkick.redis - end - def batches_key "searchkick:reindex:#{name}:batches" end diff --git a/lib/searchkick/reindex_queue.rb b/lib/searchkick/reindex_queue.rb index 8534ec2..49cfe2a 100644 --- a/lib/searchkick/reindex_queue.rb +++ b/lib/searchkick/reindex_queue.rb @@ -5,36 +5,32 @@ module Searchkick def initialize(name) @name = name - raise Searchkick::Error, "Searchkick.redis not set" unless redis + raise Searchkick::Error, "Searchkick.redis not set" unless Searchkick.redis end def push(record_id) - redis.lpush(redis_key, record_id) + Searchkick.with_redis { |r| r.lpush(redis_key, record_id) } end # TODO use reliable queuing def reserve(limit: 1000) record_ids = Set.new - while record_ids.size < limit && record_id = redis.rpop(redis_key) + while record_ids.size < limit && record_id = Searchkick.with_redis { |r| r.rpop(redis_key) } record_ids << record_id end record_ids.to_a end def clear - redis.del(redis_key) + Searchkick.with_redis { |r| r.del(redis_key) } end def length - redis.llen(redis_key) + Searchkick.with_redis { |r| r.llen(redis_key) } end private - def redis - Searchkick.redis - end - def redis_key "searchkick:reindex_queue:#{name}" end diff --git a/test/test_helper.rb b/test/test_helper.rb index dae9e9f..fe36fb4 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -14,7 +14,13 @@ File.delete("elasticsearch.log") if File.exist?("elasticsearch.log") Searchkick.client.transport.logger = Logger.new("elasticsearch.log") Searchkick.search_timeout = 5 -Searchkick.redis = Redis.new if defined?(Redis) +if defined?(Redis) + if defined?(ConnectionPool) + Searchkick.redis = ConnectionPool.new { Redis.new } + else + Searchkick.redis = Redis.new + end +end puts "Running against Elasticsearch #{Searchkick.server_version}" -- libgit2 0.21.0