как предоставить цепочку в качестве обратного вызова для аккорда сельдерея, когда эта цепочка должна выполняться при успехе или неудаче аккорда - PullRequest
1 голос
/ 08 мая 2020

После этого переполнения стека ответа , вот игрушечный пример, который выполняет обратный вызов для аккорда сельдерея, независимо от того, был ли заголовок успешным.

tasks.py:

from celery import group, chord, Celery

cel = Celery(__name__,
                broker='redis://localhost:6379',
                backend='redis://localhost:6379')

@cel.task
def add(a, b):
    print("adding {} + {}".format(a, b))
    return a + b

@cel.task
def mult(a, b):
    print("multiplying {} * {}".format(a, b))
    return a * b

@cel.task
def div(a, b):
    print("dividing {} by {}".format(a, b))
    return a / b

@cel.task
def subtract(a, b):
    print("subtracting {} from {}".format(b, a))
    return a - b

@cel.task
def div_with_err(a, b):
    print("This task was called with arguments {} and {} and should raise an error.".format(a, b))
    return 1/0

@cel.task(name='tasks.callback')
def callback(*args, **kwargs):
    print("Callback is executing")
    print("args are ", args)
    print("kwargs are ", kwargs)


group_foo = group([subtract.s(1, 3), add.s(2, 4)])

chain_bar = mult.s(1, 3) | div.s(4)

chain_with_err = mult.s(1, 3) | div_with_err.s(4)

header = group([group_foo, chain_with_err])

callback_baz = callback.s()

callback_baz.set(link_error=['tasks.callback'])

job = chord(header, callback_baz)

job.apply_async()

Но я застрял на том, что мне действительно нужно, а именно на том, чтобы обратный вызов был цепочкой, которая выполняется независимо от того, успешно или нет аккорд. . Вот кое-что, что я попробовал очень наивно, не совсем ожидая, что это сработает:

@cel.task(name='tasks.callback')
def callback_task_1(*args, **kwargs):
    print("Callback is executing")
    print("args are ", args)
    print("kwargs are ", kwargs)
    return 1 + 1

@cel.task
def callback_task_2(a):
    print("Callback task 2 is executing; received {} from callback task 1".format(a))

callback_chain = chain(callback_task_1.s(), callback_task_2.s(), name='callback_chain')

group_foo = group([subtract.s(1, 3), add.s(2, 4)])

chain_bar = mult.s(1, 3) | div.s(4)

chain_with_err = mult.s(1, 3) | div_with_err.s(4)

header = group([group_foo, chain_with_err])

callback_baz = callback_task_1.s()

callback_chain.set(link_error=['callback_chain'])

job = chord(header, callback_chain)

job.apply_async()

, но этого не произошло. Не работает с

[2020-05-08 13:21:15,480: ERROR/MainProcess] Received unregistered task of type 'callback_chain'.
The message has been ignored and discarded.

Может ли кто-нибудь предложить другой подход? Если хуже будет в моем реальном приложении, я мог бы сделать функции, представленные callback_chain, одной большой задачей, но это действительно было бы обломом. Например, последняя задача в цепочке - это задача send_email, которая используется во многих местах приложения, и я не хочу дублировать все ее функции в специальной задаче только для этого случая.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...