Очистка очереди с использованием AMQP, Rabbit и Ruby - PullRequest
1 голос
/ 15 апреля 2010

Я разрабатываю систему на Ruby, которая будет использовать RabbitMQ для отправки сообщений в очередь, поскольку она выполняет некоторую работу. Я использую:

Большинство примеров, которые я видел в этом драгоценном камне, имеют вызовы публикации в блоке EM.add_periodic_timer. Это не работает для того, что я подозреваю, в подавляющем большинстве случаев, и, конечно, не для меня. Мне нужно опубликовать сообщение, когда я завершу некоторую работу, поэтому недостаточно поместить оператор публикации в блок add_periodic_timer.

Итак, я пытаюсь выяснить, как опубликовать несколько сообщений в очереди, а затем «очистить» их, чтобы все опубликованные сообщения доставлялись моим подписчикам.

Чтобы дать вам представление о том, что я имею в виду, рассмотрим следующий код издателя:

#!/usr/bin/ruby

require 'rubygems'
require 'mq'

MESSAGES = ["hello","goodbye","test"]

AMQP.start do

  queue = MQ.queue('testq')
  messages_published = 0

  while (messages_published < 50)

    if (rand() < 0.4)
      message = MESSAGES[rand(MESSAGES.size)]
      puts "#{Time.now.to_s}: Publishing: #{message}"
      queue.publish(message)
      messages_published += 1
    end

    sleep(0.1)

  end

  AMQP.stop do
    EM.stop
  end

end

Итак, этот код просто зацикливается, публикуя сообщение с вероятностью 40% на каждой итерации цикла, а затем спит в течение 0,1 секунды. Он делает это до тех пор, пока не будет опубликовано 50 сообщений, а затем останавливает AMQP. Конечно, это всего лишь подтверждение концепции.

Теперь мой код подписчика:

#!/usr/bin/ruby

require 'rubygems'
require 'mq'

AMQP.start do

  queue = MQ.queue('testq') 
  queue.subscribe do |header, msg|
    puts "#{Time.now.to_s}: Received #{msg}"
  end

end

Итак, мы просто подписываемся на очередь, и для каждого полученного сообщения мы распечатываем его.

Отлично, за исключением того, что подписчик получает все 50 сообщений только тогда, когда издатель вызывает AMQP.stop.

Вот вывод моего издателя. Для краткости оно было усечено посередине:

$ ruby publisher.rb 
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:42 -0400: Publishing: hello
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:43 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: test
2010-04-14 21:45:45 -0400: Publishing: test
.
.
.
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: goodbye

Далее вывод от моего подписчика:

$ ruby consumer.rb
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received hello
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
.
.
.
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye

Если вы отметите временные метки в выходных данных, подписчик получит все сообщения только после того, как издатель остановит AMQP и завершит работу.

Итак, будучи новичком AMQP, как я могу получить мои сообщения для немедленной доставки? Я попытался поместить AMQP.start и AMQP.stop в тело цикла while издателя, но затем доставляется только первое сообщение - хотя странным образом, если я включаю ведение журнала, сервер не выдает никаких сообщений об ошибках и сообщения отправляются в очередь, но никогда не принимаются подписчиком.

Предложения будут высоко оценены. Спасибо за чтение.

1 Ответ

2 голосов
/ 15 апреля 2010

Для всех, кому нужна информация по этому вопросу, см. http://groups.google.com/group/ruby-amqp/browse_thread/thread/311965bcf9697ece

Я исправил проблему, добавив дополнительный поток в код моего издателя:

!/usr/bin/ruby

require 'rubygems'
require 'mq'

MESSAGES = ["hello","goodbye","test"]

AMQP.start do

  Thread.new do
      queue = MQ.queue('testq')
      messages_published = 0

      while (messages_published < 50)

        if (rand() < 0.4)
          message = MESSAGES[rand(MESSAGES.size)]
          puts "#{Time.now.to_s}: Publishing: #{message}"
          queue.publish(message)
          messages_published += 1
        end

        sleep(0.1)

      end

      AMQP.stop do
        EM.stop
      end

  end

end
...