Подписаться на очередь, получить 1 сообщение, а затем отписаться - PullRequest
8 голосов
/ 16 апреля 2011

У меня есть сценарий, в котором мне нужно очень быстро распределять и обрабатывать задания.В очереди у меня будет около 45 заданий, и я смогу одновременно обработать около 20 (5 машин по 4 ядра).Каждое задание занимает различное количество времени, и для усложнения вопросов сборка мусора является проблемой, поэтому мне нужно иметь возможность перевести потребителя в автономный режим для сбора мусора.

В настоящее время у меня все работает с pop (каждый потребительпоявляется каждые 5 мс).Это кажется нежелательным, потому что это переводит 600 запросов pop в секунду в rabbitmq.

Мне бы очень хотелось, если бы была команда pop, которая действовала бы как подписка, но только для одного сообщения.(процесс заблокировал бы, ожидая ввода от соединения rabbitMQ, через что-то похожее на Kernel.select)

Я пытался обмануть гем AMQP, чтобы сделать что-то подобное, но он не работает: я могу 'Кажется, отписаться, пока очередь не пуста и больше не отправляются сообщения потребителю.Я боюсь, что другие способы отписки потеряют сообщения.

потребление_1.rb:

require "amqp"

EventMachine.run do
  puts "connecting..."
  connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
  puts "Connected to AMQP broker"

  channel = AMQP::Channel.new(connection)
  queue = channel.queue("tasks", :auto_delete => true)
  exchange = AMQP::Exchange.default(channel)

  queue.subscribe do |payload|
    puts "Received a message: #{payload}."
    queue.unsubscribe { puts "unbound" }
    sleep 3
  end
end

consumer_many.rb:

require "amqp"

# Imagine the command is something CPU - intensive like image processing.
command = "sleep 0.1"

EventMachine.run do
  puts "connecting..."
  connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
  puts "Connected to AMQP broker"

  channel = AMQP::Channel.new(connection)
  queue = channel.queue("tasks", :auto_delete => true)
  exchange = AMQP::Exchange.default(channel)

  queue.subscribe do |payload|
    puts "Received a message: #{payload}."
  end
end

provider.rb:

require "amqp"

i = 0
EventMachine.run do
  connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
  puts "Connected to AMQP broker"

  channel = AMQP::Channel.new(connection)
  queue = channel.queue("tasks", :auto_delete => true)
  exchange = AMQP::Exchange.default(channel)

  EM.add_periodic_timer(1) do
    msg = "Message #{i}"
    i+=1
    puts "~ publishing #{msg}"
  end
end

Я запущу Потребительские_мани.рб и продюсер.рб.Сообщения будут передаваться, как и ожидалось.

Когда я запускаю потребление_1.rb, оно получает все остальные сообщения (как и ожидалось).Но он НИКОГДА не отписывается, потому что он никогда не заканчивает обработку всех своих сообщений ... и так далее.

Как я могу получить consumer_1.rb, чтобы подписаться на очередь, получить одно сообщение, а затем удалить себякольца балансировки нагрузки, чтобы он мог выполнять свою работу без потери каких-либо дополнительных ожидающих заданий, которые могли бы находиться в очереди и в противном случае были бы запланированы для отправки процессу?

Тим

Ответы [ 2 ]

13 голосов
/ 16 апреля 2011

Это простая, но очень плохо документированная особенность гема AMQP, что вам нужно:

У вашего потребителя:

channel = AMQP::Channel.new(connection, :prefetch => 1)

А затем с вашим блоком подписки,do:

queue.subscribe(:ack => true) do |queue_header, payload|
  puts "Received a message: #{payload}."
  # Do long running work here

  # Acknowledge message
  queue_header.ack
end

Что это означает, что RabbitMQ сообщает, что отправляет вашему потребителю только 1 сообщение за раз, и не отправляет другое сообщение, пока вы не вызовете ack в заголовке очереди после некоторое время я работал над долгосрочным заданием.

Возможно, мне нужно исправить это, но я считаю, что обмен direct лучше подойдет для этого задания.

1 голос
/ 27 марта 2015

С окружением, которое у меня есть,

Версия RabbitMQ: 3.3.3

AMQP Gem версия: 1.5.0

Решение от Ивана все еще привело к тому, что все сообщения извлекались из очереди.

Вместо этого количество неподтвержденных сообщений путем подписки на очередь может быть ограничено установкой QoS канала.

Согласно документу API AMQP :: Channel,

#qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) ⇒ Object

Примечание к этому методу: если вы работаете с серверами RabbitMQ после версии 2.3.6, prefetch_size не рекомендуется.

channel = AMQP::Channel.new(connection, :prefetch => 1)
channel.qos(0, 1)

queue = channel.queue(queue_name, :auto_delete => false)
queue.subscribe(:ack => true) do |metadata, payload|
  puts "Received a message: #{payload}."

  # Do long running work here

  # Acknowledge message
  metadata.ack
end

Надеюсь, что решение кому-нибудь поможет.

Приветствие.

...