У меня проблема с тем, что публикация около двадцати сообщений AMQP в общую очередь в RabbitMQ задерживается в пакете до того, как другой потребитель получит сообщения.Топология моего приложения имеет три конечные точки:
- Первая конечная точка будет генерировать пакет сообщений, каждое из которых содержит полное доменное имя сетевого устройства, из которого будет получен файл конфигурации.Будет один экземпляр этой конечной точки, и он является единственным издателем в прямой очереди с именем net.svc.ssh.
- Вторая конечная точка будет использовать одно из этих сообщений, SSH, на указанное сетевое устройство, запустить CLIКоманда и опубликовать результат с каждого устройства в другую очередь.Будет несколько экземпляров этой конечной точки, каждый из которых использует net.svc.ssh и публикует в net.svc.savefile.
- Третья конечная точка получит этот результат и сохранит его на локальном диске в своем отдельномфайл.Этот экземпляр является единственным потребителем net.svc.savefile и не публикует никаких сообщений.
Я вижу, что все вторые конечные точки будут "держаться" за свои исходящие сообщения, пока не будет сообщенийосталось потреблять из очереди ssh.Это предотвращает своевременную доставку конфигураций устройства к конечной точке.Я наблюдаю за тем, чтобы каждая конфигурация появлялась в одно и то же время, когда все средние конечные точки потребляют свои сообщения.
Итак, мой вопрос: почему я вижу такое поведение спулинга?Я хотел бы, чтобы мои сообщения публиковались в тот момент, когда они готовы, и это поведение не предназначено.
конечная точка 1 - отправитель запросов резервного копирования сетевого устройства:
EventMachine.add_timer( 0 ) do
YAML::load_file( 'hosts.yaml' ).each do |fqdn,ip|
payload = { :fqdn => fqdn }
exchange.publish( payload.to_json, :routing_key => 'net.svc.ssh' )
end
end
конечная точка 2 - узел ssh
mq_queue.subscribe( :ack => false ) do |meta, mq_payload|
payload = JSON.parse( mq_payload)
# SSH stuff happens here
mq_channel.default_exchange.publish( payload.to_json,
:routing_key => 'net.svc.savefile',
)
end
end
конечная точка 3 - узел, который сохраняет конфигурацию устройства в файл
queue.subscribe( ) do |meta, mq_payload|
payload = JSON.parse( mq_payload )
payload[ "host" ].each do |host,hash|
File.open( File.join('backups', host), "w" ) do |file|
result = hash[ "result" ]
if( result[ "error" ].nil? )
file << result[ "show running-config" ].join( "\n" )
else
file << result[ "error" ]
end
end
end
end