После публикации моего вопроса мне пришло в голову, что мой код выше полагается на то, что я могу либо воссоздать объект request
, либо выбрать его. Ни один из этих подходов не был возможен. Поэтому мне нужно было обернуть только кусок кода, который потребовал бы времени для выполнения задачи сельдерея. Я обнаружил, что могу вернуть результат задачи сельдерея в методе сериализатора create
вместо обычного объекта БД.
Я должен упомянуть, что этот сериализатор никогда не возвращает объект БД в любом случае, поскольку он фактически объединяет данные из двух внешних API в мой API. Я не показываю этот код.
Мои представления были сильно настроены, однако они в значительной степени функционируют как нормальные представления.
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__accepted = False
def post(self, request, *args, **kwargs):
self.__accepted = False # Use the normal serializer
self.create(request, *args, **kwargs)
self.__accepted = True # Use the JobQueue serializer
return self.create_accepted(request, *args, **kwargs)
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
# Do not return a Response
def create_accepted(self, request, *args, **kwargs):
data = {}
data['endpoint_name'] = request.data.get('name')
# Add any data needed to create a JobQueue object.
serializer = self.get_serializer(data=data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
data = serializer.data
headers = self.get_success_headers(data)
return Response(data, status=status.HTTP_202_ACCEPTED,
headers=headers)
def get_serializer_class(self):
serializer = None
if self.__accepted:
if self.request.version == Decimal("1"):
serializer = JobQueueSerializerVer01
else:
if self.request.version == Decimal("1"):
serializer = SomeSerializerVer01
return serializer
Теперь меняется сериализатор:
class SomeSerializerVer01(serializers.Serializer):
def create(self, validated_data):
# Call the task
return wait_for_long_running_code.delay(
validated_data, self.initial_data)
def create_after_task(self, validated_data, initial_data):
self.initial_data = initial_data
self._create_or_update_job_queue(
name, job_status=JobQueue.INPROGRESS)
# Do what you need to do here
data = {}
# Update the JobQueue DB object.
self._create_or_update_job_queue(
name, job_status=JobQueue.SUCCESS,
job_ended=datetime.datetime.now(tzutc()))
return data
def _create_or_update_job_queue(self, name, **kwargs):
trx = JobQueue.objects.create_transaction(
name, Endpoint.HOST_GROUP, self.get_user_object(), **kwargs)
return trx
Теперь задание:
@shared_task(bind=True, default_retry_delay=15, max_retry=8)
def wait_for_long_runninf_code(self, validated_data, initial_data):
from your.path import SomeSerializerVer01
ser = SomeSerializerVer01()
result = {}
try:
result = ser.create_after_task(validated_data, initial_data)
except Exception:
self.retry(exc=e)
return result
И это все. Некоторым из того, что я делаю, вам, возможно, не нужно делать, например, прохождение диктанта initial_data
. Не все показано выше, например, представление и сериализатор для объекта JobQueue
DB.