Я копался в модуле django_celery_results
и обнаружил, что он предоставляет модель TaskResult, которая предоставляет собственный интерфейс Django ORM для модели результатов. Таким образом, вы можете искать результат с помощью TaskResult.objects.filter
В качестве примечания, хотя документы Celery охватывают с модулем django_celery_results , они не упоминают эту модель TaskResult
или ее использование. Модель задокументирована в самих документах модуля .
Пример:
from django_celery_results.models import TaskResult
# Grab all tasks that have not been successful yet that have the
# PK for the comment object being deleted in their args
results = TaskResult.objects.filter(
task_args=(instance.pk, True or False),
task_name='theden_django.activity.tasks.create_comment_activity',
).exclude(
status='SUCCESS'
)
Для более полного примера это последний обработчик post_delete
для моего Комментария объект, который отзывает все задачи с идентификаторами, которые были найдены, наряду с задачей post_save
, которая их создает:
from django.dispatch import receiver
from django.db.models import signals
from django_celery_results.models import TaskResult
from theden_django.core import celery_app
from theden_django.comments.models import Comment
from theden_django.activity.tasks import create_comment_activity
@receiver(signals.post_save, sender=Comment)
def schedule_comment_activity_task(sender, instance, created, **kwargs):
"""Creates an activity when comments are created"""
del sender
del kwargs
if created:
create_comment_activity.delay(instance.pk, created)
@receiver(signals.post_delete, sender=Comment)
def revoke_comment_activity_task(sender, instance, **kwargs):
"""Ensures any pending or running tasks are revoked when a comment is deleted"""
del sender
del kwargs
results = TaskResult.objects.filter(
task_args=(instance.pk, True or False),
task_name='theden_django.activity.tasks.create_comment_activity',
).exclude(
status='SUCCESS'
)
for result in results:
celery_app.control.revoke(task_id=result.task_id)
Caveat
Эта реализация имеет огромное предостережение, так как он найдет только те задачи, которые были пробованы хотя бы один раз. Я все еще думаю, что должен быть способ с событиями API. Одним из смягчающих факторов для этого является сделать отмену задачи самой задержанной задачей, что должно гарантировать, что задача будет выполнена после всех задач в очереди. Для особой осторожности я установил эту задачу с задержкой в 5 минут, так что даже если что-то ужасно медленно работает на работнике, задачи должны быть записаны в бэкэнд-результат к моменту запуска задачи отзыва.
handlers.py
:
from django.dispatch import receiver
from django.db.models import signals
from django_celery_results.models import TaskResult
from theden_django.core import celery_app
from theden_django.comments.models import Comment
from theden_django.activity.tasks import create_comment_activity
@receiver(signals.post_save, sender=Comment)
def schedule_comment_activity_task(sender, instance, created, **kwargs):
"""Creates an activity when comments are created"""
del sender
del kwargs
if created:
create_comment_activity.delay(instance.pk, created)
@receiver(signals.post_delete, sender=Comment)
def revoke_comment_activity_task(sender, instance, **kwargs):
"""Ensures any pending or running tasks are revoked when a comment is deleted"""
del sender
del kwargs
revoke_pending_activity_tasks.delay(
(instance.pk, True or False),
'theden_django.activity.tasks.create_comment_activity',
countdown=300
)
tasks.py
:
from __future__ import absolute_import, unicode_literals
import logging
from psycopg2 import OperationalError as psycopg2OperationalError
from django.db import Error
from django.db.models import ObjectDoesNotExist
from django.db.utils import OperationalError
from celery import shared_task
from django_celery_results.models import TaskResult
from theden_django.core import celery_app
LOGGER = logging.getLogger()
@shared_task(bind=True, max_retries=None)
def create_comment_activity(self, comment_pk, created):
"""
| Async task that creates an activity for a comment.
:param `integer` comment_pk:
:param `boolean` created:
:return:
"""
try:
instance = Comment.objects.get(pk=comment_pk)
except ObjectDoesNotExist as missing_comment_exception:
LOGGER.error(
'Unable to find comment %s. Rescheduling task.',
comment_pk
)
self.retry(exc=missing_comment_exception, countdown=60)
except (Error, OperationalError, psycopg2OperationalError) as db_conn_exception:
LOGGER.error(
'Unable to create comment activity for %s due to database error. Rescheduling task.',
comment_pk
)
self.retry(exc=db_conn_exception, countdown=60)
else:
try:
create_activity(instance, instance.user.pk, created, 'comment')
except ObjectDoesNotExist as missing_member_exception:
LOGGER.error(
'Unable to find actor %s for activity. Rescheduling',
comment_pk
)
self.retry(exc=missing_member_exception, countdown=60)
except (Error, OperationalError, psycopg2OperationalError) as db_conn_exception:
LOGGER.error(
'Unable to create comment activity for %s due to database error. Rescheduling task.',
comment_pk
)
self.retry(exc=db_conn_exception, countdown=60)
@shared_task(bind=True, max_retries=None)
def revoke_pending_activity_tasks(task_args, task_name):
"""
| Async task that cancels other tasks
:param task_args:
:param task_name:
:return:
"""
results = TaskResult.objects.filter(
task_args=task_args,
task_name=task_name,
).exclude(
status='SUCCESS'
)
for result in results:
celery_app.control.revoke(task_id=result.task_id)