Работник завершает работу сразу после получения задания из очереди redis. - PullRequest
0 голосов
/ 11 июля 2019

Я пытаюсь выполнять задачи асинхронно с Celery.Я искал повсюду в Интернете, и я еще не нашел свою ошибку точно в Интернете.Я бегу сельдерея 4.3 из док-контейнера.

Я уже пытался выполнить обновление до версии-кандидата 4.4, хотя ошибка сохраняется.Я думал, что, возможно, этот запрос на получение решит мои проблемы, но это не так.

У меня также есть подозрение, что, возможно, проблема в sql-алхимии, но я не уверен почему.У меня есть другие функции, которые работают при асинхронном вызове, но по какой-то причине, когда я использую этот формат, мой код не запускается.

Я также попытался установить параметр задачи bind=True, который не работал.

import sqlalchemy as sa
import pandas as pd

class MyTask(Task):

    """
    Custom extension of celery.Task that sends email alerts when task failure occurs.
    """

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        # Send error email.
        now = dt.datetime.now()
        to = ["it@guy.com"]
        subject = f"DSJOBS Error - {now:}!"
        template = JINJA2_ENVIRONMENT.get_template("email/error.html")
        text = template.render(
            args=args,
            einfo=einfo,
            exc=exc,
            kwargs=kwargs,
            task_id=task_id,
            title=str(type(einfo)),
        )
        send_email(to, subject, text)
        # Perform inherited actions on failure.
        super().on_failure(exc, task_id, args, kwargs, einfo)


@APP.task(base=MyTask, 
    name="dsjobs.tasks.get_keywords", 
    time_limit=12 * 3_600,)
def get_keywords(date):
  engine = sa.create_engine(**db_creds_here)
  df = pd.DataFrame(engine.query(f'select * from sometable where date = "{date}"'))
  #[do some work on df]
  return df

Когда я вызываю такие функции как обычные функции get_keywords(d1) из celery shell,мой код работает просто отлично, и я получаю ожидаемые результаты.

Проблема заключается в том, что я пытаюсь использовать базовые функции сельдерея в асинхронных задачах.Когда я использую get_keywords.delay(d1), задание отправляется в мою очередь повторного выполнения, и затем мой работник возвращает эту ошибку:

[2019-07-11 12:28:32,839: ERROR/MainProcess] Error on stopping Pool: TypeError("__init__() missing 1 required positional argument: 'name'")
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "/opt/conda/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/opt/conda/lib/python3.7/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/opt/conda/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "/opt/conda/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/opt/conda/lib/python3.7/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/opt/conda/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "/opt/conda/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/opt/conda/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 596, in start
    c.loop(*c.loop_args())
  File "/opt/conda/lib/python3.7/site-packages/celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "/opt/conda/lib/python3.7/site-packages/kombu/asynchronous/hub.py", line 362, in create_loop
    cb(*cbargs)
  File "/opt/conda/lib/python3.7/site-packages/celery/concurrency/asynpool.py", line 300, in on_result_readable
    next(it)
  File "/opt/conda/lib/python3.7/site-packages/celery/concurrency/asynpool.py", line 281, in _recv_message
    message = load(bufv)
TypeError: __init__() missing 1 required positional argument: 'name'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/celery/bootsteps.py", line 151, in send_all
    fun(parent, *args)
  File "/opt/conda/lib/python3.7/site-packages/celery/bootsteps.py", line 373, in stop
    return self.obj.stop()
  File "/opt/conda/lib/python3.7/site-packages/celery/concurrency/base.py", line 122, in stop
    self.on_stop()
  File "/opt/conda/lib/python3.7/site-packages/celery/concurrency/prefork.py", line 145, in on_stop
    self._pool.join()
  File "/opt/conda/lib/python3.7/site-packages/billiard/pool.py", line 1581, in join
    stop_if_not_current(self._result_handler)
  File "/opt/conda/lib/python3.7/site-packages/billiard/pool.py", line 143, in stop_if_not_current
    thread.stop(timeout)
  File "/opt/conda/lib/python3.7/site-packages/billiard/pool.py", line 500, in stop
    self.on_stop_not_started()
  File "/opt/conda/lib/python3.7/site-packages/celery/concurrency/asynpool.py", line 336, in on_stop_not_started
    on_state_change,
  File "/opt/conda/lib/python3.7/site-packages/celery/concurrency/asynpool.py", line 360, in _flush_outqueue
    task = reader.recv()
  File "/opt/conda/lib/python3.7/site-packages/billiard/connection.py", line 281, in recv
    return ForkingPickler.loadbuf(buf)
  File "/opt/conda/lib/python3.7/site-packages/billiard/reduction.py", line 61, in loadbuf
    return cls.loads(buf.getbuffer())
TypeError: __init__() missing 1 required positional argument: 'name'

Заранее благодарим вас за помощь!

...