from celery.task import Task
from django.db import transaction
class MyTask(Task):
# ...
def run(self, *args, **kwargs):
# doesn't work
with transaction.atomic():
super().run(*args, **kwargs)
celery_task = celery_app.task(ignore_result=True, base=MyTask)
@celery_task
# @transaction.atomic # this should work, but I want to add transaction through base task class
def foo_task():
pass
Мне нужно добавить элементарную транзакцию в каждую задачу с помощью celery_task
декоратора без использования дополнительных декораторов.