Я использую pika на python3, чтобы использовать очередь на RabbitMQ.Предполагается, что мой потребитель остановится / будет убит при определенных событиях, и поэтому он должен быть в состоянии самостоятельно обрабатывать закрытие каналов и соединений.После некоторого бега я обнаружил, что у меня было много «потребителей-зомби» (из-за отсутствия лучшего термина они являются зарегистрированными потребителями на Кролике, их необработанные сообщения лежат без дела, но часто уже не имеют соответствующего процесса)оставаясь без дела.
После некоторых экспериментов я обнаружил, что, когда я пытаюсь запустить channel.cancel()
, процесс просто зависает, пока что-то не приходит, чтобы убить его, и тогда Кролик считает, что потребитель все еще активенвремя (думаю, около 20 минут).
Мой код работает так:
def do_some_work(method_frame, body):
# something happens here
if condition:
logging.info("Closing up...")
requeue = channel.cancel()
logging.info("Consumer stopped, {0} messages sent back".format(requeue))
for method_frame, properties, body in channel.consume(args.queue):
do_some_work(method_frame, body)
Когда встречается condition
, я вижу первую строку журнала и запросы, чтобы получить большесообщения от Rabbit stop (по крайней мере, это то, что я могу сказать от Rabbit), но процесс фактически зависает, не закрывая каналы и соединение, пока не произойдет очистка от Rabbit и ОС.