Как мне сделать этого подписчика на одно сообщение AMQP стабильным? - PullRequest
3 голосов
/ 05 октября 2011

Как часть более крупного приложения, мне нужно настроить базовое ограничение скорости исходящих запросов для нескольких сотрудников.Идея, лежащая в основе этого, довольно проста: публикуя сообщение «токен» с флагом «немедленное», это сообщение автоматически отбрасывается, если его никто не ждет.Если работники подписываются на очередь токенов только перед отправкой исходящего запроса, токены не «сохраняются», и каждый токен доступен для использования только один раз.Я подумал, что это довольно элегантно.

К сожалению, добавление и удаление подписчиков не совсем стабильно.Я установил полный пример на https://gist.github.com/1263921/ebdafa067ca09514183d3fc5d6e43c7094fc2733. Код ниже:

require 'bundler'
Bundler.setup

require 'amqp'

puts "single-message consumer listening to rapid producer"

QUEUE_NAME   = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9

def start_producer
  exchange = AMQP::Exchange.new(AMQP::Channel.new, :direct, "") 

  n = 0
  EM::PeriodicTimer.new(PRODUCE_RATE) do
    message = "msg #{n}"
    exchange.publish(message,
                     :immediate   => true, # IMPORTANT, messages are dropped if nobody listening now
                     :routing_key => QUEUE_NAME)
    puts "> PUT #{message}"
    n += 1
  end
end

def start_consumer

  EM::PeriodicTimer.new(CONSUME_RATE) do

    started = Time.now
    AMQP::Channel.new do |channel_consumer|
      channel_consumer.prefetch(1)
      tick_queue = channel_consumer.queue(QUEUE_NAME)

      consumer = AMQP::Consumer.new(channel_consumer, tick_queue, nil, exclusive = false, no_ack = true)
      consumer.on_delivery do |_, message|

        took = Time.now - started
        puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"

        consumer.cancel
        channel_consumer.close
      end
      consumer.consume
    end
  end
end

EM.run do
  EM.set_quantum(50)

  start_producer
  start_consumer
end

Выполнение этого примера в течение нескольких минут приводит к смерти с одной из двух ошибок:

  1. amq-client-0.8.3/lib/amq/client/async/consumer.rb:246:in `block in <class:Consumer>': undefined method `handle_delivery' for nil:NilClass (NoMethodError)

  2. amq-client-0.8.3/lib/amq/client/async/adapter.rb:244:in `send_frame': Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007fa6d29a35f0 @payload="\x00<\x00(\x00\x00\x00\x1Ftest.rapid-queue-unsubscription\x02", @channel=1> (AMQ::Client::ConnectionClosedError)

Первая ошибка связана с удалением абонента, носообщение все еще доставляется ему, и библиотека amq-client никогда не ожидает, что это произойдет.Вторая ошибка от издателя, который внезапно имеет закрытое соединение.

Чего мне не хватает, чтобы это работало должным образом?

Используемые версии:

  • OS X 10.7.1
  • ruby ​​1.9.2p312 (2011-08-11 редакция 32926) [x86_64-darwin11.1.0]
  • RabbitMQ 2.6.1

Gemfile:

source 'http://rubygems.org'

gem 'amqp'

Gemfile.lock:

GEM
  remote: http://rubygems.org/
  specs:
    amq-client (0.8.3)
      amq-protocol (>= 0.8.0)
      eventmachine
    amq-protocol (0.8.1)
    amqp (0.8.0)
      amq-client (~> 0.8.3)
      amq-protocol (~> 0.8.0)
      eventmachine
    eventmachine (0.12.10)

PLATFORMS
  ruby

DEPENDENCIES
  amqp
  eventmachine

1 Ответ

2 голосов
/ 05 октября 2011

Из канала #rabbitmq (автор amqp antares_): просто используйте один канал, и он будет работать нормальноСлегка измененная, но стабильная версия:

require 'bundler'
Bundler.setup

require 'amqp'

puts "single-message consumer listening to rapid producer"

QUEUE_NAME   = 'test.rapid-queue-unsubscription'
PRODUCE_RATE = 1.0/10
CONSUME_RATE = 1.0/9

def start_producer channel
  exchange = AMQP::Exchange.new(channel, :direct, "") 

  n = 0
  EM::PeriodicTimer.new(PRODUCE_RATE) do
    message = "msg #{n}"
    exchange.publish(message,
                     :immediate   => true, # IMPORTANT, messages are dropped if nobody listening now
                     :routing_key => QUEUE_NAME)
    puts "> PUT #{message}"
    n += 1
  end
end

def start_consumer channel
  EM::PeriodicTimer.new(CONSUME_RATE) do

    started = Time.now
    tick_queue = channel.queue(QUEUE_NAME)

    consumer = AMQP::Consumer.new(channel, tick_queue, nil, exclusive = false, no_ack = true)
    consumer.on_delivery do |_, message|

      took = Time.now - started
      puts "< GET #{message} [waited #{took.round(2)}s][#{(1.0/took).round(2)} reqs/sec]"

      consumer.cancel do
        puts "< GET #{message} (CANCEL DONE)"
      end
    end
    consumer.consume
  end
end

EM.run do
  EM.set_quantum(50)

  AMQP::Channel.new do |channel|
    start_producer channel
  end

  AMQP::Channel.new do |channel|
    channel.prefetch(1)
    start_consumer channel
  end

end
...