Python Celery: Как мне объединить результаты задач не по порядку? - PullRequest
0 голосов
/ 20 февраля 2019

У меня есть простой проект, в котором я создаю кучу работ, которые не связаны друг с другом, создаю задачи, передаю их в Redis, и несколько рабочих распределяются по жеванию Docker Swarm через очередь длинныхзадачи.Когда рабочие заканчивают работу, они сбрасывают свою законченную работу в общий ресурс NFS и отправляют обратно текстовое значение клиенту Celery.

Я использую функцию celery.result.ResultSet .join () в массиве resultset asyncresult.объекты.В метод join () входит обратный вызов, который (на данный момент) просто печатает результат.

Моя проблема заключается в блоках join (), пока он не получит каждое значение асинхронного результата в указанном порядке.Мой рой состоит из множества хостов, которые являются совершенно разными машинами, и для меня важно, чтобы результаты возвращались по окончании, а не по порядку или после того, как все они завершены.

Есть ли способчерез Celery, чтобы правильно вызвать функцию обратного вызова, когда задачи завершены?Я просмотрел множество примеров в Интернете и, похоже, что мой единственный вариант - попытать счастья с помощью asyncio, но Python не совсем мой сильный набор.

Func для создания задач и ResultSet obj:

def populateQueue(encodeTasks):
r = ResultSet([])
taskHandles = {}

for task in encodeTasks:
    try:
        ret = encode.delay(task)
        r.add(ret)
        logging.debug("Task ID: " + str(ret.task_id))
        taskHandles[ret.task_id] = ret 
    except:
        logging.info("populateQueue fail: " + str(task.traceback))

logging.info("Tasks queued: " + str(len(taskHandles)))
return taskHandles, r

Часть main (), которая ожидает результатов:

        frameCountTotal = getFrameCount(targetFile)
        encodeTasks = buildCmdString(targetFile, frameCountTotal, clientCount)
        taskHandles, retSet = populateQueue(encodeTasks)

        logging.info("Waiting on tasks...")
        retSet.join(callback=testCallback)

Заранее спасибо

1 Ответ

0 голосов
/ 26 февраля 2019

Нашел ответ на мой собственный вопрос:

В ResultSet есть еще один метод, называемый join_native (), который, я думаю, использует более конкретные вызовы API для брокера, если этот брокер является одним из нескольких известных продуктов (RabbitMQ)., Redis и т. Д.).Документация Celery просто говорит, что она дает лучшую производительность, если вы отвечаете требованиям брокера.Чего не говорят документы, так это того, что он допускает возврат не по порядку (по крайней мере, на Redis, еще не пробовал RMQ).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...