Самый простой способ сделать это - использовать AMQP для очередей сообщений и позволить брокеру сообщений позаботиться о них. Я реализовал аналогичную систему, используя RabbitMQ в качестве посредника сообщений с устойчивыми постоянными очередями. Сообщения даже пережили сбой серверного программного обеспечения RabbitMQ, когда я использовал устаревшую версию сервера 1.72 на виртуальном сервере Linux с только 512 МБ ОЗУ и около миллиона сообщений в игре.
То, как я это делаю, заключается в том, что каждый тип работника получает сообщения из другой очереди. Если мне требуется более одного работника этого типа, то очередь сообщений автоматически переходит в циклический режим, и если работник не может завершить обработку сообщения, он просто не подтверждает его, и он возвращается в очередь.
Я написал небольшой модуль shim с примерно 80 строками кода, чтобы он стоял перед kombu
, а позже переписал его, чтобы использовать py-amqplib
. Если бы я знал о haigha
ранее, я бы использовал это, так как это очень близко соответствует документу спецификаций AMQP.
Я не рекомендую kombu, потому что он настолько сложен для отладки и странным образом отличается от стандарта AMQP. Взгляните на haigha
, потому что, хотя документация представляет собой не более одного примера фрагмента кода на PyPi, она лучше документирована, чем либо kombu, либо amqplib, потому что вы можете использовать спецификации AMQP в качестве документа haigha.