Как найти идентификаторы задач для событий задач Celery на основе переданных им аргументов? - PullRequest
0 голосов
/ 11 февраля 2020

Я застрял с чем-то, что мне кажется простым. Я пытаюсь определить, могу ли я найти все идентификаторы задач с известным набором аргументов. Возможно ли это с помощью API Celery 4.4 или мне нужно кодировать свой собственный интерфейс к таблице результатов в Django ORM и искать себя?

Мой вариант использования - один из тех, которые, как я думаю, могут быть обычный. У меня есть задачи, которые запланированы для создания объектов деятельности и уведомлений для каналов и профилей пользователей. Они настроены на повторную попытку до успешного завершения, так как я должен убедиться, что эти объекты созданы.

Однако при высокой нагрузке, когда асинхронные задачи могут отставать, пользователь может сделать что-то вроде публикации комментария, а затем удалить его до создания объектов действия или уведомлений. Ситуация, подобная этой, потребовала бы, чтобы задачи были отменены или помещены в очередь навсегда.

Очевидным решением для меня было вызвать revoke () для идентификаторов задач, которые имели идентификатор объекта в своих аргументах из post_delete сигнализирует после получения списка идентификаторов задач из API событий, однако я не смог четко найти интерфейс в API API, который позволил бы мне это сделать. Я думаю, что мой ответ может быть в celery.events.state.Task , но если это интерфейс для запроса Задач, я не уверен, что args доступен для поиска. Моим задачам активности передаются идентификатор объекта и логическое значение create из сигнала Django post_save.

Я мог бы взять все задачи соответствующего типа, например project.activity.create_comment_activity, но тогда мне пришлось бы oop выполнить каждую задачу, распаковать аргументы и проверить их, что кажется ужасным для масштабируемости.

Я, должно быть, здесь упустил хитрость, или я просто написал свой собственный интерфейс для бэкэнда результатов и поиска задач с помощью Django ORM.

1 Ответ

0 голосов
/ 11 февраля 2020

Я копался в модуле django_celery_results и обнаружил, что он предоставляет модель TaskResult, которая предоставляет собственный интерфейс Django ORM для модели результатов. Таким образом, вы можете искать результат с помощью TaskResult.objects.filter

В качестве примечания, хотя документы Celery охватывают с модулем django_celery_results , они не упоминают эту модель TaskResult или ее использование. Модель задокументирована в самих документах модуля .

Пример:

from django_celery_results.models import TaskResult

# Grab all tasks that have not been successful yet that have the 
# PK for the comment object being deleted in their args
results = TaskResult.objects.filter(
    task_args=(instance.pk, True or False),
    task_name='theden_django.activity.tasks.create_comment_activity',
).exclude(
    status='SUCCESS'
)

Для более полного примера это последний обработчик post_delete для моего Комментария объект, который отзывает все задачи с идентификаторами, которые были найдены, наряду с задачей post_save, которая их создает:


from django.dispatch import receiver
from django.db.models import signals
from django_celery_results.models import TaskResult
from theden_django.core import celery_app
from theden_django.comments.models import Comment
from theden_django.activity.tasks import create_comment_activity

@receiver(signals.post_save, sender=Comment)
def schedule_comment_activity_task(sender, instance, created, **kwargs):
    """Creates an activity when comments are created"""
    del sender
    del kwargs
    if created:
        create_comment_activity.delay(instance.pk, created)

@receiver(signals.post_delete, sender=Comment)
def revoke_comment_activity_task(sender, instance, **kwargs):
    """Ensures any pending or running tasks are revoked when a comment is deleted"""
    del sender
    del kwargs

    results = TaskResult.objects.filter(
        task_args=(instance.pk, True or False),
        task_name='theden_django.activity.tasks.create_comment_activity',
    ).exclude(
        status='SUCCESS'
    )

    for result in results:
        celery_app.control.revoke(task_id=result.task_id)

Caveat

Эта реализация имеет огромное предостережение, так как он найдет только те задачи, которые были пробованы хотя бы один раз. Я все еще думаю, что должен быть способ с событиями API. Одним из смягчающих факторов для этого является сделать отмену задачи самой задержанной задачей, что должно гарантировать, что задача будет выполнена после всех задач в очереди. Для особой осторожности я установил эту задачу с задержкой в ​​5 минут, так что даже если что-то ужасно медленно работает на работнике, задачи должны быть записаны в бэкэнд-результат к моменту запуска задачи отзыва.

handlers.py:

from django.dispatch import receiver
from django.db.models import signals
from django_celery_results.models import TaskResult
from theden_django.core import celery_app
from theden_django.comments.models import Comment
from theden_django.activity.tasks import create_comment_activity

@receiver(signals.post_save, sender=Comment)
def schedule_comment_activity_task(sender, instance, created, **kwargs):
    """Creates an activity when comments are created"""
    del sender
    del kwargs
    if created:
        create_comment_activity.delay(instance.pk, created)


@receiver(signals.post_delete, sender=Comment)
def revoke_comment_activity_task(sender, instance, **kwargs):
    """Ensures any pending or running tasks are revoked when a comment is deleted"""
    del sender
    del kwargs

    revoke_pending_activity_tasks.delay(
        (instance.pk, True or False),
        'theden_django.activity.tasks.create_comment_activity',
        countdown=300
    )

tasks.py:

from __future__ import absolute_import, unicode_literals

import logging
from psycopg2 import OperationalError as psycopg2OperationalError
from django.db import Error
from django.db.models import ObjectDoesNotExist
from django.db.utils import OperationalError
from celery import shared_task
from django_celery_results.models import TaskResult
from theden_django.core import celery_app

LOGGER = logging.getLogger()

@shared_task(bind=True, max_retries=None)
def create_comment_activity(self, comment_pk, created):
    """
    | Async task that creates an activity for a comment.
    :param `integer` comment_pk:
    :param `boolean` created:
    :return:
    """
    try:
        instance = Comment.objects.get(pk=comment_pk)
    except ObjectDoesNotExist as missing_comment_exception:
        LOGGER.error(
            'Unable to find comment %s. Rescheduling task.',
            comment_pk
        )
        self.retry(exc=missing_comment_exception, countdown=60)
    except (Error, OperationalError, psycopg2OperationalError) as db_conn_exception:
        LOGGER.error(
            'Unable to create comment activity for %s due to database error. Rescheduling task.',
            comment_pk
        )
        self.retry(exc=db_conn_exception, countdown=60)
    else:
        try:
            create_activity(instance, instance.user.pk, created, 'comment')
        except ObjectDoesNotExist as missing_member_exception:
            LOGGER.error(
                'Unable to find actor %s for activity. Rescheduling',
                comment_pk
            )
            self.retry(exc=missing_member_exception, countdown=60)
        except (Error, OperationalError, psycopg2OperationalError) as db_conn_exception:
            LOGGER.error(
                'Unable to create comment activity for %s due to database error. Rescheduling task.',
                comment_pk
            )
            self.retry(exc=db_conn_exception, countdown=60)


@shared_task(bind=True, max_retries=None)
def revoke_pending_activity_tasks(task_args, task_name):
    """
    | Async task that cancels other tasks
    :param task_args:
    :param task_name:
    :return:
    """

    results = TaskResult.objects.filter(
        task_args=task_args,
        task_name=task_name,
    ).exclude(
        status='SUCCESS'
    )

    for result in results:
        celery_app.control.revoke(task_id=result.task_id)

...