Я запускаю chord
из сельдерея внутри моего приложения django
в ответ на запрос.Аккорд исполняется правильно, но django никогда не освобождает канал pub-sub.Уничтожение сервера django освобождает канал и затем исчезает из redis-cli pubsub channels
.
- Celery 4.1.1 или 4.2.0rc4
- Redis 4.0.9
- Python2.7.15
- Работает локально, 1 работник сельдерея, 1 сервер API
- В этом случае результаты не имеют значения (но в документах говорится, что они не должны игнорироваться)
- Полный пример проектаat: https://github.com/awbacker/celerychord-issue
После нажатия /api/start/
и наблюдения за выполнением задач на вкладке «Запуск сельдерея» я вижу 5 оставшихся каналов.Убийство Джанго удаляет каналы, убийство сельдерея не влияет на них.
redis-cli pubsub channels
1) "celery-task-meta-chord-lphsmq-chunk-4-14"
2) "celery-task-meta-chord-lphsmq-chunk-2-12"
3) "celery-task-meta-chord-lphsmq-chunk-3-13"
4) "celery-task-meta-chord-lphsmq-chunk-1-11"
5) "celery-task-meta-chord-lphsmq-chunk-0-10"
Я вижу, что каналы сохраняются, когда все идет хорошо, поэтому никаких ошибок не выдается.
Кто-нибудь может увидеть что-то, что я делаю не так?Я знаю, что в сельдерее сообщается о нескольких проблемах, но я не уверен, следует ли из них следующее:
код:
# --- endpoint.py -------------------------------------------
chord_key = get_random_string(6, string.ascii_lowercase)
all_tasks = celery.chord(
task_id="chord-%s" % chord_key,
header=celery.group(
tasks.process_chunk.subtask(args=(x,), task_id="chord-%s-chunk-%s-%s" % (chord_key, i, x))
for i, x in enumerate(range(10, 15))
),
# immutable = ignore results from parent
body=celery.chain(
tasks.post_step_1.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
tasks.post_step_2.subtask(args=(20,), task_id="chord-%s-post-1" % chord_key, immutable=True),
)
)
result = all_tasks.apply_async()
return Response(data=dict(chord_key=chord_key, result=repr(result)))
# --- tasks.py ----------------------------------------------
@celery_app.task(bind=True, ignore_result=False)
def process_chunk(self, x):
logging.error(" ~ executing process-chunk: %s" % x)
return x * 2
@celery_app.task(bind=True, ignore_result=False)
def post_step_1(self, y):
logging.error(" ~ executing post-step-1")
return y * 3
@celery_app.task(bind=True, ignore_result=False)
def post_step_2(self, z):
logging.error(" ~ executing post-step-2")
return z * 5