Celery Task Пользовательский метод отслеживания - PullRequest
0 голосов
/ 27 февраля 2019

Моя главная проблема заключается в том, что мне нужно знать, находится ли задача в очереди, запущена или отозвана.

Я не могу сделать это с сельдереем и редисом, потому что через 24 часа после того, как результаты в редисе, они удалены.

У меня были некоторые идеи, но я думаю, что наиболее надежным является отслеживание базы данных и ручное добавление критической информации, которая мне нужна для выполнения задачи, выполняемой пользователем.

Существуют методы для этого, которые могут выполняться до запуска задачи, и я также могу вручную работать с базой данных, когда я создаю задачу или отменяю их, верно?я не буду создавать новую строку для каждой задачи, но вместо этого просто обновлять строку для каждого пользователя, потому что меня интересует только последняя задача каждого пользователя.

1 Ответ

0 голосов
/ 27 февраля 2019

Возможно, вам придется объединить несколько подходов.Если ваши результаты истекают в бэкэнде (что разумно), вам придется использовать другое хранилище, например, базу данных для долгосрочного архивирования состояний задач.Для начала вы можете включить task_track_started, чтобы задачи сообщали о состоянии STARTED, когда рабочий начинает выполнение).Затем периодически проверяйте бэкэнд результатов на наличие обновлений состояния задач, которые не находятся в состоянии готовности (SUCCESS, FAILURE и REVOKED).Если они находятся в конечном состоянии, удалите результат из бэкэнда, используя метод forget().

Единственная проблема - в отмененных задачах.Если нет доступных рабочих, отмена задачи не имеет никакого эффекта (поэтому вы всегда должны ждать ответа при вызове отзыва).Если рабочие заняты и, таким образом, задача остается в очереди сообщений, рабочие просто отмечают, что такую ​​задачу следует отбрасывать, когда они забирают ее из очереди, но она сохраняется только в состоянии рабочего.Как только они его принимают, они сбрасывают задание, и в итоге результат содержит REVOKED статус.Важно отметить, что отозванные задачи поддерживаются только в рабочем состоянии, поэтому вы должны использовать параметр --statedb, чтобы сохранить состояние в случае сбоя рабочего.В противном случае уже отозванные задачи будут успешно обработаны тем же или другим работником.

Я полагаю, что вам лучше всего вызывать команду отзыва и, если вы получаете ответ от работников, задайте внутренний статус задачи в своембазы данных в что-то вроде FLAGGED_REVOKED.В цикле обновления состояния обновляйте статус отозванной задачи, только если это не PENDING.

. У меня есть простое приложение для планирования заданий, в котором APScheduler используется в качестве планировщика, а Celery - в качестве уровня выполнения.Информация о работах, прогонах работ и расписании хранятся в MongoDB.Вот код, который я использую для отмены задания:

database = scheduler._jobstores['default'].collection.database
collection = database['runs']

run = collection.find_one({'job_id': job_id, '_id': run_id})
if run.get('task_state') in ('PENDING', 'RECEIVED', 'STARTED', 'RETRY'):
    reply = celery.control.revoke(run['task_id'], terminate=terminate, reply=True)
    if reply:
        collection.update_one({'_id': run['_id']},
                              {'$set': {'task_state': 'FLAGGED_REVOKED'}})
    else:
        raise Exception('Failed to revoke the task (no reply received)')
else:
    raise Exception('Job execution cannot be canceled')

Это мой код обновления статуса (он сохраняется как внутреннее задание APScheduler для запуска каждые несколько секунд):

database = scheduler._jobstores['default'].collection.database
collection = database['runs']

runs = collection.find({
    'task_id': {'$exists': True},
    'task_state': {'$nin': ['SUCCESS', 'FAILURE', 'REVOKED']}
})
for run in runs:
    result = AsyncResult(run['task_id'],
                         backend=celery.backend, app=celery)
    if run['task_state'] == 'FLAGGED_REVOKED' and result.state == 'PENDING':
        update = {'task_state': 'FLAGGED_REVOKED'}
    else:
        update = {'task_state': result.state}
    if result.state == 'FAILURE':
        update['exception'] = str(result.result)
        update['traceback'] = result.traceback
    elif result.state == 'SUCCESS':
        update['result'] = result.result
    if result.date_done:
        date_done = dateparser.parse(result.date_done) \
            if isinstance(result.date_done, str) else result.date_done
        update['finish_time'] = date_done
    try:
        collection.update_one({'_id': run['_id']}, {'$set': update})
    except Exception as e:
        print('Failed to update task status: %s', str(e))
    else:
        if result.state in ['SUCCESS', 'FAILURE', 'REVOKED']:
            result.forget()
...