Commit 7b06dbe4cdec37ff1349ccf01bf8cb3295971007

Authored by Andrew Kane
1 parent 2edb71f2

Added support for connection_pool

CHANGELOG.md
1 1 ## 2.1.1 [unreleased]
2 2  
3 3 - Fixed duplicate notifications
  4 +- Added support for `connection_pool`
4 5  
5 6 ## 2.1.0
6 7  
... ...
Gemfile
... ... @@ -9,3 +9,4 @@ gem "gemoji-parser"
9 9 gem "typhoeus"
10 10 gem "activejob"
11 11 gem "redis"
  12 +gem "connection_pool"
... ...
lib/searchkick.rb
... ... @@ -145,6 +145,18 @@ module Searchkick
145 145 end
146 146 end
147 147  
  148 + def self.with_redis
  149 + if redis
  150 + if redis.respond_to?(:with)
  151 + redis.with do |r|
  152 + yield r
  153 + end
  154 + else
  155 + yield redis
  156 + end
  157 + end
  158 + end
  159 +
148 160 # private
149 161 def self.load_records(records, ids)
150 162 records =
... ...
lib/searchkick/index.rb
... ... @@ -258,7 +258,7 @@ module Searchkick
258 258  
259 259 if batch
260 260 import_or_update scope.to_a, method_name, async
261   - redis.srem(batches_key, batch_id) if batch_id && redis
  261 + Searchkick.with_redis { |r| r.srem(batches_key, batch_id) } if batch_id
262 262 elsif full && async
263 263 full_reindex_async(scope)
264 264 elsif scope.respond_to?(:find_in_batches)
... ... @@ -283,7 +283,7 @@ module Searchkick
283 283 end
284 284  
285 285 def batches_left
286   - redis.scard(batches_key) if redis
  286 + Searchkick.with_redis { |r| r.scard(batches_key) }
287 287 end
288 288  
289 289 # other
... ... @@ -459,7 +459,7 @@ module Searchkick
459 459 index_name: name,
460 460 batch_id: batch_id
461 461 }.merge(options))
462   - redis.sadd(batches_key, batch_id) if redis
  462 + Searchkick.with_redis { |r| r.sadd(batches_key, batch_id) }
463 463 end
464 464  
465 465 def batch_size
... ... @@ -492,10 +492,6 @@ module Searchkick
492 492 Searchkick.indexer.queue(records.map { |r| {update: record_data(r).merge(data: {doc: search_data(r, method_name)})} })
493 493 end
494 494  
495   - def redis
496   - Searchkick.redis
497   - end
498   -
499 495 def batches_key
500 496 "searchkick:reindex:#{name}:batches"
501 497 end
... ...
lib/searchkick/reindex_queue.rb
... ... @@ -5,36 +5,32 @@ module Searchkick
5 5 def initialize(name)
6 6 @name = name
7 7  
8   - raise Searchkick::Error, "Searchkick.redis not set" unless redis
  8 + raise Searchkick::Error, "Searchkick.redis not set" unless Searchkick.redis
9 9 end
10 10  
11 11 def push(record_id)
12   - redis.lpush(redis_key, record_id)
  12 + Searchkick.with_redis { |r| r.lpush(redis_key, record_id) }
13 13 end
14 14  
15 15 # TODO use reliable queuing
16 16 def reserve(limit: 1000)
17 17 record_ids = Set.new
18   - while record_ids.size < limit && record_id = redis.rpop(redis_key)
  18 + while record_ids.size < limit && record_id = Searchkick.with_redis { |r| r.rpop(redis_key) }
19 19 record_ids << record_id
20 20 end
21 21 record_ids.to_a
22 22 end
23 23  
24 24 def clear
25   - redis.del(redis_key)
  25 + Searchkick.with_redis { |r| r.del(redis_key) }
26 26 end
27 27  
28 28 def length
29   - redis.llen(redis_key)
  29 + Searchkick.with_redis { |r| r.llen(redis_key) }
30 30 end
31 31  
32 32 private
33 33  
34   - def redis
35   - Searchkick.redis
36   - end
37   -
38 34 def redis_key
39 35 "searchkick:reindex_queue:#{name}"
40 36 end
... ...
test/test_helper.rb
... ... @@ -14,7 +14,13 @@ File.delete(&quot;elasticsearch.log&quot;) if File.exist?(&quot;elasticsearch.log&quot;)
14 14 Searchkick.client.transport.logger = Logger.new("elasticsearch.log")
15 15 Searchkick.search_timeout = 5
16 16  
17   -Searchkick.redis = Redis.new if defined?(Redis)
  17 +if defined?(Redis)
  18 + if defined?(ConnectionPool)
  19 + Searchkick.redis = ConnectionPool.new { Redis.new }
  20 + else
  21 + Searchkick.redis = Redis.new
  22 + end
  23 +end
18 24  
19 25 puts "Running against Elasticsearch #{Searchkick.server_version}"
20 26  
... ...