параллельные задачи сельдерея выполняются и сохраняют результат, но .get не работает - PullRequest
0 голосов
/ 25 сентября 2018

Я написал класс задачи Celery следующим образом:

myapp.tasks.py

from __future__ import absolute_import, unicode_literals
from .services.celery import app
from .services.command_service import CommandService
from exceptions.exceptions import *
from .models import Command


class CustomTask(app.Task):

    def run(self, json_string, method_name, cmd_id: int):
        command_obj = Command.objects.get(id=cmd_id)  # type: Command
        try:
            val = eval('CommandService.{}(json_string={})'.format(method_name, json_string))
            status, error = 200, None
        except Exception as e:
            auto_retry = command_obj.auto_retry
            if auto_retry and isinstance(e, CustomError):
                command_obj.retry_count += 1
                command_obj.save()
                return self.retry(countdown=CustomTask._backoff(command_obj.retry_count), exc=e)
            elif auto_retry and isinstance(e, AnotherCustomError) and command_obj.retry_count == 0:
                command_obj.retry_count += 1
                command_obj.save()
                print("RETRYING NOW FOR DEVICE CONNECTION ERROR. TRANSACTION: {} || IP: {}".format(command_obj.transaction_id,
                                                                                                command_obj.device_ip))
                return self.retry(countdown=command_obj.retry_count*2, exc=e)
            val = None
            status, error = self._find_status_code(e)

        return_dict = {"error": error, "status_code": status, "result": val}
        return return_dict

    @staticmethod
    def _backoff(attempts):
        return 2 ** attempts

    @staticmethod
    def _find_status_code(exception):
        if isinstance(exception, APIException):
            detail = exception.default_detail if exception.detail is None else exception.detail
            return exception.status_code, detail

        return 500, CustomTask._get_generic_exc_msg(exception)

    @staticmethod
    def _get_generic_exc_msg(exc: Exception):
        s = ""
        try:
            for msg in exc.args:
                s += msg + ". "
        except Exception:
            s = str(exc)
        return s


CustomTask = app.register_task(CustomTask())

Определение приложения Celery:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery, Task
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')

_celery_broker = settings.CELERY_BROKER  <-- my broker is amqp://username:password@localhost:5672/myhost
app = Celery('myapp', broker=_celery_broker, backend='rpc://', include=['myapp.tasks', 'myapp.controllers'])
app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks(['myapp'])
app.conf.update(
    result_expires=4800,
    task_acks_late=True
)

my init .py учебник рекомендуется:

from .celery import app as celery_app

__all__ = ['celery_app']

Контроллер, который выполняет задачу:

from __future__ import absolute_import, unicode_literals
from .services.log_service import LogRunner
from myapp.services.command_service import CommandService
from exceptions.exceptions import *
from myapp.services.celery import app
from myapp.services.tasks import MyTask
from .models import Command

class MyController:
    def my_method(self, json_string):
        <non-async set up stuff here>

        cmd_obj = Command.objects.create(<stuff>)  # type: Command
        task_exec = MyTask.delay(json_string, MyController._method_name, cmd_obj.id)
        cmd_obj.task_id = task_exec
        try:
            return_dict = task_exec.get()
        except Exception as e:
            self._logger.error("ERROR: IP: {} and transaction: {}. Error Type: {}, "
                            "Celery Error: {}".format(ip_addr, transaction_id, type(e), e))
            status_code, error = self._find_status_code(e)
            return_dict = {"error": error, "status_code": status_code, "result": None}
        return return_dict

** Так вот моя проблема: **

Когда я запускаю этот контроллер Django, нажимая на представление одним запросом, один за другим, он работает отлично .

Тем не менее, внешний сервис янажатие вызовет ошибку для 2 одновременных запросов (и это ожидается - это нормально).После получения ошибки я повторяю свою задачу автоматически.

Вот странная часть После повторной попытки .get(), установленный в моем контроллере , перестает работать для всех одновременных запросов.Мой контроллер просто висит там!И я знаю, что сельдерей на самом деле выполняет задачу!Вот журналы запуска сельдерея:

[2018-09-25 19:10:24,932: INFO/MainProcess] Received task: myapp.tasks.MyTask[bafd62b6-7e29-4c39-86ff-fe903d864c4f]  
[2018-09-25 19:10:25,710: INFO/MainProcess] Received task: myapp.tasks.MyTask[8d3b4279-0b7e-48cf-b45d-0f1f89e213d4]  <-- THIS WILL FAIL BUT THAT IS OK
[2018-09-25 19:10:25,794: ERROR/ForkPoolWorker-1] Could not connect to device with IP <some ip> at all. Retry Later plase
[2018-09-25 19:10:25,798: WARNING/ForkPoolWorker-1] RETRYING NOW FOR DEVICE CONNECTION ERROR. TRANSACTION: b_txn || IP: <some ip>
[2018-09-25 19:10:25,821: INFO/MainProcess] Received task: myapp.tasks.MyTask[8d3b4279-0b7e-48cf-b45d-0f1f89e213d4]  ETA:[2018-09-25 19:10:27.799473+00:00] 
[2018-09-25 19:10:25,823: INFO/ForkPoolWorker-1] Task myapp.tasks.MyTask[8d3b4279-0b7e-48cf-b45d-0f1f89e213d4] retry: Retry in 2s: AnotherCustomError('Could not connect to IP <some ip> at all.',)
[2018-09-25 19:10:27,400: INFO/ForkPoolWorker-2] executed command some command at IP <some ip> 
[2018-09-25 19:10:27,418: INFO/ForkPoolWorker-2] Task myapp.tasks.MyTask[bafd62b6-7e29-4c39-86ff-fe903d864c4f] succeeded in 2.4829552830196917s: {'error': None, 'status_code': 200, 'result': True}
<some command output here from a successful run>  **<-- belongs to task bafd62b6-7e29-4c39-86ff-fe903d864c4f**

[2018-09-25 19:10:31,058: INFO/ForkPoolWorker-2] executed some command at  IP <some ip> 
[2018-09-25 19:10:31,059: INFO/ForkPoolWorker-2] Task command_runner.tasks.MyTask[8d3b4279-0b7e-48cf-b45d-0f1f89e213d4] succeeded in 2.404364461021032s: {'error': None, 'status_code': 200, 'result': True}
<some command output here from a successful run> **<-- belongs to task 8d3b4279-0b7e-48cf-b45d-0f1f89e213d4 which errored and retried itself**

Итак, как вы можете видеть, задача выполняется на сельдерее! Просто .get(), установленный в моем контроллере, не можетчтобы восстановить эти результаты - независимо от успешных задач или ошибочных задач.

Часто ошибка, возникающая при выполнении одновременных запросов Error: "Received 0x50 while expecting 0xce". Что это?что это значит? Опять же, как ни странно, все это работает при выполнении одного запроса за другим без обработки Django нескольких входящих запросов.Хотя я не смог повторить попытку разовых запросов.

1 Ответ

0 голосов
/ 04 октября 2018

Сервер RPC (то, что ожидает get) предназначен для сбоя, если он используется более одного раза или после перезапуска сельдерея .

результат можетбыть восстановленным только один раз, и только клиентом, который инициировал задачу.Два разных процесса не могут ожидать одного и того же результата.

По умолчанию сообщения являются временными (непостоянными), поэтому результаты перезапускаются при перезапуске посредника.Вы можете настроить бэкэнд результата на отправку постоянных сообщений, используя параметр result_persistent.

Таким образом, похоже, что это происходит из-за того, что исключение приводит к остановке celery и разрыву соединения rpc с вызывающим контроллером.Учитывая ваш вариант использования, может иметь смысл использовать постоянный бэкэнд результатов, например, redis или базу данных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...