Celery - подпись задачи преобразована в словарь и не может вызывать аккорд - использование цепочки задач в группе заголовков аккорда - PullRequest
0 голосов
/ 24 марта 2020

После ответа на аккорд сельдерея с группой цепочек с исключением в цепочке

Я получаю ошибку, когда кажется, что Celery меняет цепочку подписей задач в словарь, вызывая ошибка ниже.

Сельдерей - 4.4 Redis - 3.3.11

Код - от @ bigzbig

@celery_app.task
def task_one():
    return 'OKIDOKI'

@celery_app.task
def task_two(str):
    return f'{str} YOUPI'

@celery_app.task
def task_three(str):
    return f'{str} MAKAPAKA'

@celery_app.task
def task_exception(str):
    raise KeyError(f'{str} Ups')

@celery_app.task(ignore_result=True)
def task_wrapper(*args, **kwargs):
    if 'job' in kwargs:
        kwargs['job'].apply()

@celery_app.task(ignore_result=True)
def callback_task(*args, **kwargs):
    return (args, kwargs, 'Yeah')

def test():
    chains = []

    tasks = [
        task_one.s(),
        task_two.s(),
        task_exception.s(),
        task_three.s(),
    ]
    chains.append(task_wrapper.s(job=chain(*tasks)))

    tasks = [
        task_one.s(),
        task_two.s(),
        task_three.s(),
    ]
    chains.append(task_wrapper.s(job=chain(*tasks)))

    chord(chains, callback_task.s()).apply_async()

Печать kwargs [ 'job']

celeryworker2_1  | [2020-03-23 22:31:01,646: WARNING/ForkPoolWorker-1] {'task': 'celery.chain', 'args': [], 'kwargs': {'tasks': [{'task': 'portfolio.tasks.task_one', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_two', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_exception', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_three', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': False, 'chord_size': None}]}, 'options': {}, 'subtask_type': 'chain', 'immutable': False, 'chord_size': None}
celeryworker_1   | [2020-03-23 22:31:01,650: WARNING/ForkPoolWorker-1] {'task': 'celery.chain', 'args': [], 'kwargs': {'tasks': [{'task': 'portfolio.tasks.task_one', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_two', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_three', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': False, 'chord_size': None}]}, 'options': {}, 'subtask_type': 'chain', 'immutable': False, 'chord_size': None}

Ошибка

Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 385, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 650, in __protected_call__
return self.run(*args, **kwargs)
File "/app/portfolio/tasks.py", line 241, in task_wrapper
kwargs['job'].apply()
AttributeError: 'dict' object has no attribute 'apply'
...