На самом деле, углубившись в это, я нашел способ добиться этого:
Есть несколько хитростей:
установить состояние задачи с помощью update_state
убедитесь, что аргумент meta
соответствует тому, что ожидает Celery для состояния FAILURE
поручить Celery удалить сообщение из очереди, но отключить запись состояния внутренней задачи Celery с помощью Ignore()
Например:
from celery import states
from celery.exceptions import Ignore
@app.task(bind=True)
def task(self):
try:
raise ValueError('Some error')
except Exception as ex:
self.update_state(
state=states.FAILURE,
meta={
'exc_type': type(ex).__name__,
'exc_message': traceback.format_exc().split('\n')
'custom': '...'
})
raise Ignore()
При получении AsyncResult
дополнительные пользовательские данные доступны через task.backend.get()
вызов:
task = tasks.task.s().delay()
meta = task.backend.get(task.backend.get_key_for_task(task.id))
json.loads(meta.decode('utf8'))['result']['custom']
Если вам нужна дополнительная справочная информация, я написал об этом в блоге: https://www.distributedpython.com/2018/09/28/celery-task-states - спасибо за вдохновение; -)