Я разрабатываю систему на Ruby, которая будет использовать RabbitMQ для отправки сообщений в очередь, поскольку она выполняет некоторую работу. Я использую:
Большинство примеров, которые я видел в этом драгоценном камне, имеют вызовы публикации в блоке EM.add_periodic_timer. Это не работает для того, что я подозреваю, в подавляющем большинстве случаев, и, конечно, не для меня. Мне нужно опубликовать сообщение, когда я завершу некоторую работу, поэтому недостаточно поместить оператор публикации в блок add_periodic_timer.
Итак, я пытаюсь выяснить, как опубликовать несколько сообщений в очереди, а затем «очистить» их, чтобы все опубликованные сообщения доставлялись моим подписчикам.
Чтобы дать вам представление о том, что я имею в виду, рассмотрим следующий код издателя:
#!/usr/bin/ruby
require 'rubygems'
require 'mq'
MESSAGES = ["hello","goodbye","test"]
AMQP.start do
queue = MQ.queue('testq')
messages_published = 0
while (messages_published < 50)
if (rand() < 0.4)
message = MESSAGES[rand(MESSAGES.size)]
puts "#{Time.now.to_s}: Publishing: #{message}"
queue.publish(message)
messages_published += 1
end
sleep(0.1)
end
AMQP.stop do
EM.stop
end
end
Итак, этот код просто зацикливается, публикуя сообщение с вероятностью 40% на каждой итерации цикла, а затем спит в течение 0,1 секунды. Он делает это до тех пор, пока не будет опубликовано 50 сообщений, а затем останавливает AMQP. Конечно, это всего лишь подтверждение концепции.
Теперь мой код подписчика:
#!/usr/bin/ruby
require 'rubygems'
require 'mq'
AMQP.start do
queue = MQ.queue('testq')
queue.subscribe do |header, msg|
puts "#{Time.now.to_s}: Received #{msg}"
end
end
Итак, мы просто подписываемся на очередь, и для каждого полученного сообщения мы распечатываем его.
Отлично, за исключением того, что подписчик получает все 50 сообщений только тогда, когда издатель вызывает AMQP.stop.
Вот вывод моего издателя. Для краткости оно было усечено посередине:
$ ruby publisher.rb
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:42 -0400: Publishing: hello
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:43 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: test
2010-04-14 21:45:45 -0400: Publishing: test
.
.
.
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: goodbye
Далее вывод от моего подписчика:
$ ruby consumer.rb
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received hello
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
.
.
.
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye
Если вы отметите временные метки в выходных данных, подписчик получит все сообщения только после того, как издатель остановит AMQP и завершит работу.
Итак, будучи новичком AMQP, как я могу получить мои сообщения для немедленной доставки? Я попытался поместить AMQP.start и AMQP.stop в тело цикла while издателя, но затем доставляется только первое сообщение - хотя странным образом, если я включаю ведение журнала, сервер не выдает никаких сообщений об ошибках и сообщения отправляются в очередь, но никогда не принимаются подписчиком.
Предложения будут высоко оценены. Спасибо за чтение.