RabbitMQ и EventMachine :: defer - PullRequest
       23

RabbitMQ и EventMachine :: defer

0 голосов
/ 25 ноября 2011

Я пытаюсь разобраться с RabbitMQ. Я хочу, чтобы прослушивалась одна очередь, и когда она получает сообщение, я хочу, чтобы он отвечал на анонимную очередь, указанную в заголовке reply_to с несколькими сообщениями.

Пока у меня есть следующая задача, которая является и потребителем, и подписчиком для сообщения reply_to:

desc "start_consumer", "start the test consumer"
def start_consumer
  puts "Running #{AMQP::VERSION} version of the gem"

      AMQP.start(:host => "localhost", :user => "guest", :password => "guest", :vhost => "/", 
                        :logging => true, :port => 5672) do |connection|

        channel = AMQP::Channel.new(connection)

        requests_queue = channel.queue("one", :exclusive => true, :auto_delete => true)

        Signal.trap("INT") do
          connection.close do
            EM.stop{exit}
          end
        end

        channel.prefetch(1)

        requests_queue.subscribe(:ack => true) do |header, body|
          puts "received in server #{body.inspect}"

          (0..5).each do |n|
            header.ack

            reply = {:reply => "Respone #{n}", :is_last => (n == 5)}

            AMQP::Exchange.default.publish(
                                            MultiJson.encode(reply), 
                                            :routing_key => header.reply_to,
                                            :correlation_id => header.correlation_id
                                          )


            sleep(2)
          end
        end

        puts " [x] Awaiting RPC requests"
      end          
    end

Мой телефонный код:

  def publish(urlSearch, routing_key)
    EM.run do

      corr_id = rand(10_000_000).to_s

      requests ||= Hash.new

      connection = AMQP.connect(:host => "localhost")

      callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => true)             

      callback_queue.subscribe do |header, body|

        reply = MultiJson.decode(body)               

        if reply[:is_last.to_s]  
          connection.close do
            EM.stop{exit}
          end
        end
      end                          

      callback_queue.append_callback(:declare) do
        AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id)
      end

    end

Проблема в том, что сообщения не отправляются, пока не будут опубликованы все сообщения в итерации.

То есть после завершения итерации в цикле (0..5) все сообщения публикуются.

Кто-то сказал мне, что можно использовать EventMachine::defer, но я не уверен, как применить это к циклу.

Кто-нибудь может подсказать, как решить эту проблему? EventMachine::defer хороший вариант и, если да, то как мне это сделать при использовании AMQP.start?

1 Ответ

0 голосов
/ 15 декабря 2011

При работе с ЭМ следует помнить, что сам реактор является однопоточным. Итак, причина, по которой он ожидает завершения цикла, прежде чем что-либо отправлять, заключается в том, что цикл занимает этот единственный поток. (Вы действительно, действительно, действительно не хотите использовать сон в приложении EM. Вы блокируете весь реактор, и ничего не произойдет.)

Если вы хотите, чтобы сообщения публиковались или действительно выполнялись какие-либо другие действия EM (срабатывание таймеров, получение других данных и т. Д.), Вы должны позволить реактору зацикливаться.

Вы можете использовать что-то вроде EM :: Iterator для планирования работы за вас, что-то вроде:

EM::Iterator.new(0..5).each do |n, iter|
  reply = {:reply => "Response #{n}", :is_last => (n == 5)}
  AMQP::Exchange.default.publish(MultiJson.encode(reply), 
                                 :routing_key => header.reply_to,
                                 :correlation_id => header.correlation_id)
  iter.next
end
header.ack
...