Возможно, вам придется объединить несколько подходов.Если ваши результаты истекают в бэкэнде (что разумно), вам придется использовать другое хранилище, например, базу данных для долгосрочного архивирования состояний задач.Для начала вы можете включить task_track_started
, чтобы задачи сообщали о состоянии STARTED
, когда рабочий начинает выполнение).Затем периодически проверяйте бэкэнд результатов на наличие обновлений состояния задач, которые не находятся в состоянии готовности (SUCCESS
, FAILURE
и REVOKED
).Если они находятся в конечном состоянии, удалите результат из бэкэнда, используя метод forget()
.
Единственная проблема - в отмененных задачах.Если нет доступных рабочих, отмена задачи не имеет никакого эффекта (поэтому вы всегда должны ждать ответа при вызове отзыва).Если рабочие заняты и, таким образом, задача остается в очереди сообщений, рабочие просто отмечают, что такую задачу следует отбрасывать, когда они забирают ее из очереди, но она сохраняется только в состоянии рабочего.Как только они его принимают, они сбрасывают задание, и в итоге результат содержит REVOKED
статус.Важно отметить, что отозванные задачи поддерживаются только в рабочем состоянии, поэтому вы должны использовать параметр --statedb
, чтобы сохранить состояние в случае сбоя рабочего.В противном случае уже отозванные задачи будут успешно обработаны тем же или другим работником.
Я полагаю, что вам лучше всего вызывать команду отзыва и, если вы получаете ответ от работников, задайте внутренний статус задачи в своембазы данных в что-то вроде FLAGGED_REVOKED
.В цикле обновления состояния обновляйте статус отозванной задачи, только если это не PENDING
.
. У меня есть простое приложение для планирования заданий, в котором APScheduler используется в качестве планировщика, а Celery - в качестве уровня выполнения.Информация о работах, прогонах работ и расписании хранятся в MongoDB.Вот код, который я использую для отмены задания:
database = scheduler._jobstores['default'].collection.database
collection = database['runs']
run = collection.find_one({'job_id': job_id, '_id': run_id})
if run.get('task_state') in ('PENDING', 'RECEIVED', 'STARTED', 'RETRY'):
reply = celery.control.revoke(run['task_id'], terminate=terminate, reply=True)
if reply:
collection.update_one({'_id': run['_id']},
{'$set': {'task_state': 'FLAGGED_REVOKED'}})
else:
raise Exception('Failed to revoke the task (no reply received)')
else:
raise Exception('Job execution cannot be canceled')
Это мой код обновления статуса (он сохраняется как внутреннее задание APScheduler для запуска каждые несколько секунд):
database = scheduler._jobstores['default'].collection.database
collection = database['runs']
runs = collection.find({
'task_id': {'$exists': True},
'task_state': {'$nin': ['SUCCESS', 'FAILURE', 'REVOKED']}
})
for run in runs:
result = AsyncResult(run['task_id'],
backend=celery.backend, app=celery)
if run['task_state'] == 'FLAGGED_REVOKED' and result.state == 'PENDING':
update = {'task_state': 'FLAGGED_REVOKED'}
else:
update = {'task_state': result.state}
if result.state == 'FAILURE':
update['exception'] = str(result.result)
update['traceback'] = result.traceback
elif result.state == 'SUCCESS':
update['result'] = result.result
if result.date_done:
date_done = dateparser.parse(result.date_done) \
if isinstance(result.date_done, str) else result.date_done
update['finish_time'] = date_done
try:
collection.update_one({'_id': run['_id']}, {'$set': update})
except Exception as e:
print('Failed to update task status: %s', str(e))
else:
if result.state in ['SUCCESS', 'FAILURE', 'REVOKED']:
result.forget()