Я пытаюсь выполнять задачи асинхронно с Celery.Я искал повсюду в Интернете, и я еще не нашел свою ошибку точно в Интернете.Я бегу сельдерея 4.3 из док-контейнера.
Я уже пытался выполнить обновление до версии-кандидата 4.4, хотя ошибка сохраняется.Я думал, что, возможно, этот запрос на получение решит мои проблемы, но это не так.
У меня также есть подозрение, что, возможно, проблема в sql-алхимии, но я не уверен почему.У меня есть другие функции, которые работают при асинхронном вызове, но по какой-то причине, когда я использую этот формат, мой код не запускается.
Я также попытался установить параметр задачи bind=True
, который не работал.
import sqlalchemy as sa
import pandas as pd
class MyTask(Task):
"""
Custom extension of celery.Task that sends email alerts when task failure occurs.
"""
def on_failure(self, exc, task_id, args, kwargs, einfo):
# Send error email.
now = dt.datetime.now()
to = ["it@guy.com"]
subject = f"DSJOBS Error - {now:}!"
template = JINJA2_ENVIRONMENT.get_template("email/error.html")
text = template.render(
args=args,
einfo=einfo,
exc=exc,
kwargs=kwargs,
task_id=task_id,
title=str(type(einfo)),
)
send_email(to, subject, text)
# Perform inherited actions on failure.
super().on_failure(exc, task_id, args, kwargs, einfo)
@APP.task(base=MyTask,
name="dsjobs.tasks.get_keywords",
time_limit=12 * 3_600,)
def get_keywords(date):
engine = sa.create_engine(**db_creds_here)
df = pd.DataFrame(engine.query(f'select * from sometable where date = "{date}"'))
#[do some work on df]
return df
Когда я вызываю такие функции как обычные функции get_keywords(d1)
из celery shell
,мой код работает просто отлично, и я получаю ожидаемые результаты.
Проблема заключается в том, что я пытаюсь использовать базовые функции сельдерея в асинхронных задачах.Когда я использую get_keywords.delay(d1)
, задание отправляется в мою очередь повторного выполнения, и затем мой работник возвращает эту ошибку:
[2019-07-11 12:28:32,839: ERROR/MainProcess] Error on stopping Pool: TypeError("__init__() missing 1 required positional argument: 'name'")
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/celery/worker/worker.py", line 205, in start
self.blueprint.start(self)
File "/opt/conda/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/opt/conda/lib/python3.7/site-packages/celery/bootsteps.py", line 369, in start
return self.obj.start()
File "/opt/conda/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
blueprint.start(self)
File "/opt/conda/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/opt/conda/lib/python3.7/site-packages/celery/bootsteps.py", line 369, in start
return self.obj.start()
File "/opt/conda/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
blueprint.start(self)
File "/opt/conda/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/opt/conda/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 596, in start
c.loop(*c.loop_args())
File "/opt/conda/lib/python3.7/site-packages/celery/worker/loops.py", line 91, in asynloop
next(loop)
File "/opt/conda/lib/python3.7/site-packages/kombu/asynchronous/hub.py", line 362, in create_loop
cb(*cbargs)
File "/opt/conda/lib/python3.7/site-packages/celery/concurrency/asynpool.py", line 300, in on_result_readable
next(it)
File "/opt/conda/lib/python3.7/site-packages/celery/concurrency/asynpool.py", line 281, in _recv_message
message = load(bufv)
TypeError: __init__() missing 1 required positional argument: 'name'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/celery/bootsteps.py", line 151, in send_all
fun(parent, *args)
File "/opt/conda/lib/python3.7/site-packages/celery/bootsteps.py", line 373, in stop
return self.obj.stop()
File "/opt/conda/lib/python3.7/site-packages/celery/concurrency/base.py", line 122, in stop
self.on_stop()
File "/opt/conda/lib/python3.7/site-packages/celery/concurrency/prefork.py", line 145, in on_stop
self._pool.join()
File "/opt/conda/lib/python3.7/site-packages/billiard/pool.py", line 1581, in join
stop_if_not_current(self._result_handler)
File "/opt/conda/lib/python3.7/site-packages/billiard/pool.py", line 143, in stop_if_not_current
thread.stop(timeout)
File "/opt/conda/lib/python3.7/site-packages/billiard/pool.py", line 500, in stop
self.on_stop_not_started()
File "/opt/conda/lib/python3.7/site-packages/celery/concurrency/asynpool.py", line 336, in on_stop_not_started
on_state_change,
File "/opt/conda/lib/python3.7/site-packages/celery/concurrency/asynpool.py", line 360, in _flush_outqueue
task = reader.recv()
File "/opt/conda/lib/python3.7/site-packages/billiard/connection.py", line 281, in recv
return ForkingPickler.loadbuf(buf)
File "/opt/conda/lib/python3.7/site-packages/billiard/reduction.py", line 61, in loadbuf
return cls.loads(buf.getbuffer())
TypeError: __init__() missing 1 required positional argument: 'name'
Заранее благодарим вас за помощь!