EventMachine EM :: Итератор блокируется RPC rabbitmq - PullRequest
0 голосов
/ 06 января 2012

Я пытаюсь настроить RabbitMQ rpc. Я хочу, чтобы прослушивалась одна очередь, и когда она получает сообщение, я хочу, чтобы он отвечал на анонимную очередь, указанную в заголовке reply_to с несколькими сообщениями.

У меня есть следующая задача: создать очередь и затем использовать EM: Iterator для отправки нескольких сообщений обратно в очередь, указанную с помощью ключа маршрутизации replyt_to:

desc "start_consumer", "start the test consumer"
def start_consumer
  conf = {
    :host => "localhost", 
    :user => "guest", 
    :password => "guest", 
    :vhost => "/", 
    :logging => true, 
    :port => 5672
  }

  # n = 1

  AMQP.start(conf) do |connection|

    channel = AMQP::Channel.new(connection)

    requests_queue = channel.queue("one")
    requests_queue.purge

    Signal.trap("INT") do
      connection.close do
        EM.stop{exit}
      end
    end

    channel.prefetch(1)

    requests_queue.subscribe(:ack => true) do |header, body|
      url_search = MultiJson.decode(body)

      EM::Iterator.new(0..5).each do |n, iter|
        lead = get_lead(n, (n == 5))

        puts "about to publish #{n} message is_last = #{lead.is_last} at #{Time.now}"

        AMQP::Exchange.default.publish(
                                        MultiJson.encode(lead), 
                                        :immediate => true,
                                        :routing_key => header.reply_to,
                                        :correlation_id => header.correlation_id
                                      )  

        iter.next
      end
    end

    puts " [x] Awaiting RPC requests"
  end          
end

Код ниже отправляет сообщение в указанную выше очередь, а также создает очередь, которая будет использоваться для прослушивания сообщений, отправленных кодом EM :: Iterator. Имя этой очереди является ключом маршрутизации для первых очередей reply_to заголовка.

def publish(urlSearch, routing_key)
  EM.run do
    corr_id = rand(10_000_000).to_s

    requests ||= Hash.new

    connection = AMQP.connect(:host => "localhost")

    callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => false)

    callback_queue.subscribe do |header, body|  
      lead = safe_json_decode(body)

      puts "company = #{lead["company"]} is_last = #{lead["is_last"]} received at #{Time.now}"              

      if lead["is_last"]
        puts "in exit"
        connection.close do
          EM.stop{exit}
        end
      end
    end

    callback_queue.append_callback(:declare) do
      AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id)
    end

    puts "initial message sent"
  end
end

Приведенный выше код работает, как я хочу, с одним раздражающим исключением. Что-то блокирует выполнение кода EM :: Iterator асинхронно. Только после завершения кода EM :: Iterator сообщения отправляются. Я хочу, чтобы сообщения отправлялись асинхронно и обрабатывались анонимной очередью после каждой итерации. В настоящее время все сообщения отправляются только после того, как код EM :: Iterator завершил свою последнюю итерацию.

Кто-нибудь может увидеть, что я делаю неправильно, или предложить другой подход? Я попробовал EM :: defer и вел себя так же.

1 Ответ

0 голосов
/ 08 января 2012

Создание новой темы было ответом на мою проблему:

Thread.new do
  5.times do
    lead = get_lead(n, (n == 5))

    puts "message #{n} is_last = #{lead.is_last} at #{Time.now}";

    AMQP::Exchange.default.publish(
                                    MultiJson.encode(lead), 
                                    :routing_key => header.reply_to,
                                    :correlation_id => header.correlation_id
                                  )

    n += 1
    sleep(2)
  end
end

Создание нового потока останавливает блокировку реактора EventMachine, и сообщения отправляются асинхронно.

...