Сельдерейский аккорд не выпускает канал redis pubsub после apply_async - PullRequest
0 голосов
/ 31 мая 2018

Я запускаю chord из сельдерея внутри моего приложения django в ответ на запрос.Аккорд исполняется правильно, но django никогда не освобождает канал pub-sub.Уничтожение сервера django освобождает канал и затем исчезает из redis-cli pubsub channels.

  • Celery 4.1.1 или 4.2.0rc4
  • Redis 4.0.9
  • Python2.7.15
  • Работает локально, 1 работник сельдерея, 1 сервер API
  • В этом случае результаты не имеют значения (но в документах говорится, что они не должны игнорироваться)
  • Полный пример проектаat: https://github.com/awbacker/celerychord-issue

После нажатия /api/start/ и наблюдения за выполнением задач на вкладке «Запуск сельдерея» я вижу 5 оставшихся каналов.Убийство Джанго удаляет каналы, убийство сельдерея не влияет на них.

redis-cli pubsub channels
1) "celery-task-meta-chord-lphsmq-chunk-4-14"
2) "celery-task-meta-chord-lphsmq-chunk-2-12"
3) "celery-task-meta-chord-lphsmq-chunk-3-13"
4) "celery-task-meta-chord-lphsmq-chunk-1-11"
5) "celery-task-meta-chord-lphsmq-chunk-0-10"

Я вижу, что каналы сохраняются, когда все идет хорошо, поэтому никаких ошибок не выдается.

Кто-нибудь может увидеть что-то, что я делаю не так?Я знаю, что в сельдерее сообщается о нескольких проблемах, но я не уверен, следует ли из них следующее:

код:

# --- endpoint.py -------------------------------------------
chord_key = get_random_string(6, string.ascii_lowercase)
all_tasks = celery.chord(
    task_id="chord-%s" % chord_key,
    header=celery.group(
        tasks.process_chunk.subtask(args=(x,), task_id="chord-%s-chunk-%s-%s" % (chord_key, i, x))
        for i, x in enumerate(range(10, 15))
    ),
    # immutable = ignore results from parent
    body=celery.chain(
        tasks.post_step_1.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
        tasks.post_step_2.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
    )
)
result = all_tasks.apply_async()
return Response(data=dict(chord_key=chord_key, result=repr(result)))

# --- tasks.py ----------------------------------------------
@celery_app.task(bind=True, ignore_result=False)
def process_chunk(self, x):
    logging.error(" ~ executing process-chunk: %s" % x)
    return x * 2


@celery_app.task(bind=True, ignore_result=False)
def post_step_1(self, y):
    logging.error(" ~ executing post-step-1")
    return y * 3


@celery_app.task(bind=True, ignore_result=False)
def post_step_2(self, z):
    logging.error(" ~ executing post-step-2")
    return z * 5

1 Ответ

0 голосов
/ 31 мая 2018

Ваш Аккорд выглядит очень сложным, может быть, поэтому сельдерею не так просто, я бы предложил реализовать логику аккордов самостоятельно, это не очень сложно.попробуйте это вместо этого ... я в основном жду задач, основанных на использовании механизма аккордов

# --- endpoint.py ------------------------------------------- 
chain_tasks = celery.chain(
            tasks.post_step_1.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
            tasks.post_step_2.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True)
            .apply_async()
chain_result= chain_tasks.get() // WAIT TO FINISH

group_task = celery.group(tasks.process_chunk.subtask(args=(chain_result,), task_id="chord-%s-chunk-%s-%s" % (chord_key, i, x))
            for i, x in enumerate(range(10, 15)).apply_async()
group_result = group_task.get()
return Response(data=dict(chord_key=chord_key, result=repr(group_result)))

, не уверен, что это именно то, что вы пытались достичь, но я думаю, что с несколькими настройками это будет работать,удачи.

...