Я новичок в сельдерее, и я был бы признателен за небольшую помощь с шаблоном проектирования (или примером кода) для работника, которого мне еще предстоит написать.
Ниже приведено описание желательных характеристик работника.
- Рабочий запустит задачу, которая собирает данные из бесконечного источника, генератора.
- Рабочее задание будет работать вечно, питаясь от генератора, если оно не направлено на остановку.
Рабочее задание должно корректно останавливаться при возникновении любого из следующих триггеров.
- Превышен лимит времени выполнения в секундах.
- Превышает количество итераций бесконечного цикла генератора.
- Клиент отправляет сообщение, инструктирующее рабочую задачу о немедленном завершении.
Ниже приведен код sudo, который, как мне кажется, необходим для обработки сценариев триггера 1 и 2.
Чего я не знаю, так это как я посылаю сигнал «немедленно закончить» от клиента и как он принимается и выполняется в рабочем задании.
Буду признателен за любой совет или пример кода.
from celery.task import task
from celery.exceptions import SoftTimeLimitExceeded
COUNTLIMIT = # some value sent to the worker task by the client
@task()
def getData():
try:
for count, data in enumerate(endlessGeneratorThing()):
# process data here
if count > COUNTLIMIT: # Handle trigger scenario 2
clean_up_task_nicely()
break
except SoftTimeLimitExceeded: # Handle trigger scenario 1
clean_up_task_nicely()