Почему мои сообщения AMQP сползут перед отправкой - PullRequest
1 голос
/ 14 января 2012

У меня проблема с тем, что публикация около двадцати сообщений AMQP в общую очередь в RabbitMQ задерживается в пакете до того, как другой потребитель получит сообщения.Топология моего приложения имеет три конечные точки:

  1. Первая конечная точка будет генерировать пакет сообщений, каждое из которых содержит полное доменное имя сетевого устройства, из которого будет получен файл конфигурации.Будет один экземпляр этой конечной точки, и он является единственным издателем в прямой очереди с именем net.svc.ssh.
  2. Вторая конечная точка будет использовать одно из этих сообщений, SSH, на указанное сетевое устройство, запустить CLIКоманда и опубликовать результат с каждого устройства в другую очередь.Будет несколько экземпляров этой конечной точки, каждый из которых использует net.svc.ssh и публикует в net.svc.savefile.
  3. Третья конечная точка получит этот результат и сохранит его на локальном диске в своем отдельномфайл.Этот экземпляр является единственным потребителем net.svc.savefile и не публикует никаких сообщений.

Я вижу, что все вторые конечные точки будут "держаться" за свои исходящие сообщения, пока не будет сообщенийосталось потреблять из очереди ssh.Это предотвращает своевременную доставку конфигураций устройства к конечной точке.Я наблюдаю за тем, чтобы каждая конфигурация появлялась в одно и то же время, когда все средние конечные точки потребляют свои сообщения.

Итак, мой вопрос: почему я вижу такое поведение спулинга?Я хотел бы, чтобы мои сообщения публиковались в тот момент, когда они готовы, и это поведение не предназначено.

конечная точка 1 - отправитель запросов резервного копирования сетевого устройства:

EventMachine.add_timer( 0 ) do
  YAML::load_file( 'hosts.yaml' ).each do |fqdn,ip|
    payload = { :fqdn => fqdn }
    exchange.publish( payload.to_json, :routing_key => 'net.svc.ssh' )
  end
end

конечная точка 2 - узел ssh

mq_queue.subscribe( :ack => false ) do |meta, mq_payload|
  payload = JSON.parse( mq_payload)

  # SSH stuff happens here

  mq_channel.default_exchange.publish( payload.to_json,
                                       :routing_key => 'net.svc.savefile',
                                       )
  end
end

конечная точка 3 - узел, который сохраняет конфигурацию устройства в файл

queue.subscribe( ) do |meta, mq_payload|
  payload = JSON.parse( mq_payload )
  payload[ "host" ].each do |host,hash|
    File.open( File.join('backups', host), "w" ) do |file|
      result = hash[ "result" ]
      if( result[ "error" ].nil? )
        file << result[ "show running-config" ].join( "\n" )
      else
        file << result[ "error" ]
      end
    end
  end
end

1 Ответ

1 голос
/ 14 января 2012

Похоже, вы публикуете сообщения в теме обмена с поведением по умолчанию.Я предполагаю, что ваши потребители (есть несколько экземпляров) подписаны на очередь с простой привязкой к ключу маршрутизации, т.е. несколько потребителей будут принимать сообщения в циклическом порядке.

Или, возможно, есть только один потребитель.

Я не использовал Ruby в течение многих лет, поэтому я не знаком с особенностями различных библиотек, но я не могу видеть, где ваши потребители получают сообщения после работы с ними.На потребителе 2 вы, кажется, явно установили auto-ack на FALSE, что хорошо, а на потребителе 3 вы не установили его так, возможно, это означает, что происходит автоматическое подтверждение.

Когда я пишу потребителю, который публикуетрезультаты в другой очереди, я делаю вещи в следующем порядке:

  1. получить входящее сообщение
  2. выполнить работу
  3. опубликовать новое сообщение в очереди результатов
  4. подтверждение входящего сообщения
...