Правильный шаблон дизайна для 202 Принятых ответов - PullRequest
1 голос
/ 06 марта 2019

У меня есть настройка сельдерея shared_task, чтобы повторить попытку до 10 раз, если это не удалось в первый раз. Первоначальный оператор журнала выполняется только один раз. Ни одно из исключений никогда не возникает, а также встроенное try/else. Оператор result = LdapHostGroupView().start(data, username, version) действительно выполняется и из записей журнала показывает, что он успешно выполнен, но окончательный else никогда не выполняется.

Что здесь происходит?

@shared_task(bind=True, default_retry_delay=15, max_retry=10)
def host_accepted(self, data, username, version):
    from .api.views import LdapHostGroupView
    name = data.get('name', '')
    version = Decimal(version)
    log.debug("name: %s, version: %s, version type: %s, data: %s",
              name, version, type(version), data)

    try:
        obj = Transaction.objects.get(endpoint_name=name)
    except Transaction.DoesNotExist as e:
        msg = "Could not find transaction '{}'".format(name)
        log.critical(msg)
        syslog.critical(msg)
    else:
        try:
            result = LdapHostGroupView().start(data, username, version)
        except RealmBundleDoesNotExist as e:
            log.debug("Bundle does not exist yet.")
            obj.job_summary += str(e) + '\n'
            obj.job_status = Transaction.INPROGRESS
            obj.save()
            self.retry(exc=e) # ** self.request.retries)
        except (RealmCriticalException, ValidationError) as e:
            error = e.get_full_details()
            log.debug("Host Accepted error: %s", error)

            if isinstance(error, dict):
                for field, values in error.items():
                    for value in values:
                        ed = value.get('message')

                        if isinstance(ed, ErrorDetail):
                            item = str(ed)
                        else:
                            item = value

                        msg = "Field '{}' has error: {}\n".format(field, item)
                        obj.job_summary += msg
            else:
                obj.job_summary += "Had error with no message.\n"

            obj.job_status = Transaction.FAILURE
            obj.save()
        else:
            log.info("Celery task 'host_accepted' executed at %s, "
                     "returned %s, incoming data %s",
                     datetime.now(tzutc()).isoformat(), result, data)

            # Check the result object.

            obj.job_status = Transaction.SUCCESS
            obj.save()

это называется в представлении Джанго так:

   host_accepted.delay(request.data, request.user.username, request.version)

1 Ответ

0 голосов
/ 07 марта 2019

После публикации моего вопроса мне пришло в голову, что мой код выше полагается на то, что я могу либо воссоздать объект request, либо выбрать его. Ни один из этих подходов не был возможен. Поэтому мне нужно было обернуть только кусок кода, который потребовал бы времени для выполнения задачи сельдерея. Я обнаружил, что могу вернуть результат задачи сельдерея в методе сериализатора create вместо обычного объекта БД.

Я должен упомянуть, что этот сериализатор никогда не возвращает объект БД в любом случае, поскольку он фактически объединяет данные из двух внешних API в мой API. Я не показываю этот код.

Мои представления были сильно настроены, однако они в значительной степени функционируют как нормальные представления.

def __init__(self, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.__accepted = False

def post(self, request, *args, **kwargs):
    self.__accepted = False # Use the normal serializer
    self.create(request, *args, **kwargs)
    self.__accepted = True # Use the JobQueue serializer
    return self.create_accepted(request, *args, **kwargs)

def create(self, request, *args, **kwargs):
    serializer = self.get_serializer(data=request.data)
    serializer.is_valid(raise_exception=True)
    self.perform_create(serializer)
    # Do not return a Response

def create_accepted(self, request, *args, **kwargs):
    data = {}
    data['endpoint_name'] = request.data.get('name')
    # Add any data needed to create a JobQueue object.
    serializer = self.get_serializer(data=data)
    serializer.is_valid(raise_exception=True)
    self.perform_create(serializer)
    data = serializer.data
    headers = self.get_success_headers(data)
    return Response(data, status=status.HTTP_202_ACCEPTED,
                    headers=headers)

def get_serializer_class(self):
    serializer = None

    if self.__accepted:
        if self.request.version == Decimal("1"):
            serializer = JobQueueSerializerVer01
    else:
        if self.request.version == Decimal("1"):
            serializer = SomeSerializerVer01

    return serializer

Теперь меняется сериализатор:

class SomeSerializerVer01(serializers.Serializer):

    def create(self, validated_data):
        # Call the task
        return wait_for_long_running_code.delay(
            validated_data, self.initial_data)

    def create_after_task(self, validated_data, initial_data):
        self.initial_data = initial_data
        self._create_or_update_job_queue(
            name, job_status=JobQueue.INPROGRESS)
        # Do what you need to do here
        data = {}

        # Update the JobQueue DB object.
        self._create_or_update_job_queue(
            name, job_status=JobQueue.SUCCESS,
            job_ended=datetime.datetime.now(tzutc()))

        return data

    def _create_or_update_job_queue(self, name, **kwargs):
        trx = JobQueue.objects.create_transaction(
            name, Endpoint.HOST_GROUP, self.get_user_object(), **kwargs)
        return trx

Теперь задание:

@shared_task(bind=True, default_retry_delay=15, max_retry=8)
def wait_for_long_runninf_code(self, validated_data, initial_data):
    from your.path import SomeSerializerVer01
    ser = SomeSerializerVer01()
    result = {}

    try:
       result = ser.create_after_task(validated_data, initial_data)
    except Exception:
       self.retry(exc=e)

    return result

И это все. Некоторым из того, что я делаю, вам, возможно, не нужно делать, например, прохождение диктанта initial_data. Не все показано выше, например, представление и сериализатор для объекта JobQueue DB.

...