У меня есть сценарий, в котором мне нужно очень быстро распределять и обрабатывать задания.В очереди у меня будет около 45 заданий, и я смогу одновременно обработать около 20 (5 машин по 4 ядра).Каждое задание занимает различное количество времени, и для усложнения вопросов сборка мусора является проблемой, поэтому мне нужно иметь возможность перевести потребителя в автономный режим для сбора мусора.
В настоящее время у меня все работает с pop (каждый потребительпоявляется каждые 5 мс).Это кажется нежелательным, потому что это переводит 600 запросов pop в секунду в rabbitmq.
Мне бы очень хотелось, если бы была команда pop, которая действовала бы как подписка, но только для одного сообщения.(процесс заблокировал бы, ожидая ввода от соединения rabbitMQ, через что-то похожее на Kernel.select)
Я пытался обмануть гем AMQP, чтобы сделать что-то подобное, но он не работает: я могу 'Кажется, отписаться, пока очередь не пуста и больше не отправляются сообщения потребителю.Я боюсь, что другие способы отписки потеряют сообщения.
потребление_1.rb:
require "amqp"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
queue.unsubscribe { puts "unbound" }
sleep 3
end
end
consumer_many.rb:
require "amqp"
# Imagine the command is something CPU - intensive like image processing.
command = "sleep 0.1"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
end
end
provider.rb:
require "amqp"
i = 0
EventMachine.run do
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
EM.add_periodic_timer(1) do
msg = "Message #{i}"
i+=1
puts "~ publishing #{msg}"
end
end
Я запущу Потребительские_мани.рб и продюсер.рб.Сообщения будут передаваться, как и ожидалось.
Когда я запускаю потребление_1.rb, оно получает все остальные сообщения (как и ожидалось).Но он НИКОГДА не отписывается, потому что он никогда не заканчивает обработку всех своих сообщений ... и так далее.
Как я могу получить consumer_1.rb, чтобы подписаться на очередь, получить одно сообщение, а затем удалить себякольца балансировки нагрузки, чтобы он мог выполнять свою работу без потери каких-либо дополнительных ожидающих заданий, которые могли бы находиться в очереди и в противном случае были бы запланированы для отправки процессу?
Тим