Семафорные блокировки Redis не могут быть освобождены - PullRequest
0 голосов
/ 17 февраля 2020

Я использую гем redis-семафор , версия 0.3.1.

. По некоторым причинам я иногда не могу снять устаревшую блокировку Redis. Из моего анализа кажется, что произошел сбой моего Docker процесса после создания блокировки.

Я описал свой процесс отладки ниже и хотел бы узнать, может ли кто-нибудь предложить, как продолжить отладку.

Предположим, что мы хотим создать блокировку redis с таким именем:

name = "test"

Вставляем эту переменную в два разных терминала windows. В первом мы запускаем:

def lock_for_15_secs(name)
  job = Redis::Semaphore.new(name.to_sym, redis: NonBlockingRedis.new(), custom_blpop: true, :stale_client_timeout => 15)
  if job.lock(-1) == "0"
    puts "Locked and starting"
    sleep(15)
    puts "Now it's stale, try to release in another process"
    sleep(15)
    puts "Now trying to unlock"
    unlock = job.unlock
    puts unlock == false ? "Wuhuu, already unlocked" : "Hm, should have been unlocked by another process, but wasn't"
  end
end
lock_for_15_secs(name)

Во втором мы запускаем:

def release_and_lock(name)
  job     = Redis::Semaphore.new(name.to_sym, redis: NonBlockingRedis.new(), custom_blpop: true, :stale_client_timeout => 15)
  release = job.release_stale_locks!
  count   = job.available_count
  puts "Release reponse is #{release.inspect} and available count is #{count}"
  if job.lock(-1) == "0"
    puts "Wuhuu, we can lock it"
    job.unlock
  else
    puts "Hmm, we can't lock it"
  end
end
release_and_lock(name)

Это обычно проигрывается, как и ожидалось. В течение 15 секунд второй терминал не может включить блокировку, но при запуске через 15 секунд он освобождается. Ниже приведен вывод release_and_lock(name).

До того, как прошло 15 секунд:

irb(main):1:0> release_and_lock(name)
Release reponse is {"0"=>"1580292557.321834"} and available count is 0
Hmm, we can't lock it
=> nil

Через 15 секунд:

irb(main):2:0> release_and_lock(name)
Release reponse is {"0"=>"1580292557.321834"} and available count is 1
Wuhuu, we can lock it
=> 1
irb(main):3:0> release_and_lock(name)
Release reponse is {} and available count is 1
Wuhuu, we can lock it

Но всякий раз, когда я вижу это устаревшая блокировка не снимается, и я запускаю release_and_lock(name), чтобы диагностировать, это возвращается:

irb(main):4:0> release_and_lock(name)
Release reponse is {} and available count is 0
Hmm, we can't lock it

И на данный момент мой единственный вариант - это грипп sh redis:

require 'non_blocking_redis'
non_blocking_redis = NonBlockingRedis.new()
non_blocking_redis.flushall

Ps My NonBlockingRedis наследуется от Redis:

class Redis
  class Semaphore
    def initialize(name, opts = {})
      @custom_opts          = opts
      @name                 = name
      @resource_count       = opts.delete(:resources) || 1
      @stale_client_timeout = opts.delete(:stale_client_timeout)
      @redis                = opts.delete(:redis) || Redis.new(opts)
      @use_local_time       = opts.delete(:use_local_time)
      @custom_blpop         = opts.delete(:custom_blpop) # false=queue, true=cancel
      @tokens               = []
    end

    def lock(timeout = 0)
      exists_or_create!
      release_stale_locks! if check_staleness?
      token_pair    = @redis.blpop(available_key, timeout, @custom_blpop)
      return false if token_pair.nil?
      current_token = token_pair[1]
      @tokens.push(current_token)
      @redis.hset(grabbed_key, current_token, current_time.to_f)
      if block_given?
        begin
          yield current_token
        ensure
          signal(current_token)
        end
      end
      current_token
    end
    alias_method :wait, :lock
  end
end

class NonBlockingRedis < Redis

  def initialize(options = {})
    if options.empty?
      options = {
        url: Rails.application.secrets.redis_url,
        db:  Rails.application.secrets.redis_sidekiq_db,
        driver: :hiredis,
        network_timeout: 5
      }
    end

    super(options)
  end

  def blpop(key, timeout, custom_blpop)
    if custom_blpop
      if timeout == -1
        result = lpop(key)
        return result if result.nil?
        return [key, result]
      else
        super(key, timeout)
      end
    else
       super
    end
  end

  def lock(timeout = 0)
    exists_or_create!
    release_stale_locks! if check_staleness?
    token_pair = @redis.blpop(available_key, timeout, @custom_blpop)
    return false if token_pair.nil?
    current_token = token_pair[1]
    @tokens.push(current_token)
    @redis.hset(grabbed_key, current_token, current_time.to_f)
    if block_given?
      begin
        yield current_token
      ensure
        signal(current_token)
      end
    end
    current_token
  end
  alias_method :wait, :lock
end

require 'non_blocking_redis'

1 Ответ

1 голос
/ 08 марта 2020

? Потрясающая ошибка ?

Ошибка

Я думаю, это произойдет, если вы убьете процесс, когда он делает lpop на SEMAPHORE:test:AVAILABLE

Скорее всего здесь https://github.com/dv/redis-semaphore/blob/v0.3.1/lib/redis/semaphore.rb#L67

Чтобы скопировать его

NonBlockingRedis.new.flushall

release_and_lock('test');

NonBlockingRedis.new.lpop('SEMAPHORE:test:AVAILABLE')

Теперь изначально у вас есть:

SEMAPHORE:test:AVAILABLE                0
SEMAPHORE:test:VERSION          1
SEMAPHORE:test:EXISTS           1

После кода выше вы получите:

SEMAPHORE:test:VERSION          1
SEMAPHORE:test:EXISTS           1

Код проверяет SEMAPHORE:test:EXISTS, а затем ожидает SEMAPHORE:test:AVAILABLE / SEMAPHORE:test:GRABBED

Решение

Из моей краткой проверки я не думаю, что это возможно заставить драгоценный камень работать без модификации. Я попытался добавить expiration:, но каким-то образом ему удалось отключить срок действия SEMAPHORE:test:EXISTS

NonBlockingRedis.new.ttl('SEMAPHORE:test:EXISTS') # => -1 and it should have been e.g. 20 seconds and going down

Так что ... возможно, исправление будет

class Redis
  class Semaphore
    def exists_or_create!
      token = @redis.getset(exists_key, EXISTS_TOKEN)

      if token.nil? || all_tokens.empty?
        create!
      else
        # Previous versions of redis-semaphore did not set `version_key`.
        # Make sure it's set now, so we can use it in future versions.

        if token == API_VERSION && @redis.get(version_key).nil?
          @redis.set(version_key, API_VERSION)
        end

        true
      end
    end
  end
end

all_tokens https://github.com/dv/redis-semaphore/blob/v0.3.1/lib/redis/semaphore.rb#L120

Я скоро открою пиар для драгоценного камня -> https://github.com/dv/redis-semaphore/pull/66 возможно maybe

Примечание 1

Не уверен, как вы используете NonBlockingRedis, но он не используется в Redis::Semaphore. Вы делаете lock(-1), что в коде lpop. Также код никогда не вызывает ваш lock.

Случайный

Вот помощник для сброса ключей

class Test
  def self.all
    r = NonBlockingRedis.new
    puts r.keys('*').map { |k|
      [
        k,
        ((r.hgetall(k) rescue r.get(k)) rescue r.lrange(k, 0, -1).join(' | '))
      ].join("\t\t")
    }
  end
end

> Test.all

SEMAPHORE:test:AVAILABLE                0
SEMAPHORE:test:VERSION          1
SEMAPHORE:test:EXISTS           1

Для полноты вот как это выглядит, когда у вас есть схватил замок

SEMAPHORE:test:VERSION          1
SEMAPHORE:test:EXISTS           1
SEMAPHORE:test:GRABBED          {"0"=>"1583672948.7168388"}
...