Bunny и RabbitMQ - Адаптация учебника WorkQueue для отмены подписки, когда очередь полностью проработана - PullRequest
0 голосов
/ 05 октября 2018

Я заполняю свою очередь, проверяю, есть ли у нее правильное количество заданий для работы, и затем работникам в параллелле присваивается значение prefetch(1), чтобы каждая из них выполняла только одну задачу за раз.

Я хочу каждуюработник выполняет свою задачу, отправляет подтверждение вручную и продолжает работать, извлекая из очереди, если есть больше работы.

Если работы больше нет, т. е. очередь пуста, я хочу, чтобы рабочий сценарий завершилсяup и return(0).

Итак, вот что у меня сейчас:

require 'bunny'

connection = Bunny.new("amqp://my_conn")
connection.start

channel = connection.create_channel
queue = channel.queue('my_queue_name')

channel.prefetch(1)
puts ' [*] Waiting for messages.'

begin
  payload = 'init'
  until queue.message_count == 0
    puts "worker working queue length is #{queue.message_count}"
    _delivery_info, _properties, payload = queue.pop
    unless payload.nil?
      puts " [x] Received #{payload}"
      raise "payload invalid" unless payload[/cucumber/]

      begin
        do_stuff(payload)
      rescue => e
        puts "Error running #{payload}: #{e.backtrace.join('\n')}"
        #failing stuff
      end
    end
    puts " [x] Done with #{payload}"
  end

  puts "done with queue"
  connection.close
  exit(0)
ensure
  connection.close
end 

Я все еще хочу убедиться, что я закончил, когда очередь пуста.Это пример с сайта RabbitMQ ... https://www.rabbitmq.com/tutorials/tutorial-two-ruby.html.У нас есть ряд вещей, которые мы хотим видеть в нашей рабочей очереди, наиболее важные из которых - ручные подтверждения.Но он не перестает работать, и мне нужно, чтобы это происходило программно, когда очередь сделана:

#!/usr/bin/env ruby
require 'bunny'

connection = Bunny.new(automatically_recover: false)
connection.start

channel = connection.create_channel
queue = channel.queue('task_queue', durable: true)

channel.prefetch(1)
puts ' [*] Waiting for messages. To exit press CTRL+C'

begin
  queue.subscribe(manual_ack: true, block: true) do |delivery_info, _properties, body|
    puts " [x] Received '#{body}'"
    # imitate some work
    sleep body.count('.').to_i
    puts ' [x] Done'
    channel.ack(delivery_info.delivery_tag)
  end
rescue Interrupt => _
  connection.close
end

Как можно адаптировать этот скрипт для выхода, когда очередь полностью обработана (всего 0 и 0 без упаковки))

1 Ответ

0 голосов
/ 15 октября 2018

Насколько я понимаю, вы хотите, чтобы ваш подписчик завершил работу, если в очереди RabbitMQ нет ожидающих сообщений.

Учитывая ваш второй сценарий, вы можете избежать передачи block: true, и это ничего не даст, когданет больше данных для обработки.В этом случае вы можете выйти из программы.

Это можно увидеть в документации: http://rubybunny.info/articles/queues.html#blocking_or_nonblocking_behavior

По умолчанию она не блокируется.

...