У меня есть простой проект, в котором я создаю кучу работ, которые не связаны друг с другом, создаю задачи, передаю их в Redis, и несколько рабочих распределяются по жеванию Docker Swarm через очередь длинныхзадачи.Когда рабочие заканчивают работу, они сбрасывают свою законченную работу в общий ресурс NFS и отправляют обратно текстовое значение клиенту Celery.
Я использую функцию celery.result.ResultSet .join () в массиве resultset asyncresult.объекты.В метод join () входит обратный вызов, который (на данный момент) просто печатает результат.
Моя проблема заключается в блоках join (), пока он не получит каждое значение асинхронного результата в указанном порядке.Мой рой состоит из множества хостов, которые являются совершенно разными машинами, и для меня важно, чтобы результаты возвращались по окончании, а не по порядку или после того, как все они завершены.
Есть ли способчерез Celery, чтобы правильно вызвать функцию обратного вызова, когда задачи завершены?Я просмотрел множество примеров в Интернете и, похоже, что мой единственный вариант - попытать счастья с помощью asyncio, но Python не совсем мой сильный набор.
Func для создания задач и ResultSet obj:
def populateQueue(encodeTasks):
r = ResultSet([])
taskHandles = {}
for task in encodeTasks:
try:
ret = encode.delay(task)
r.add(ret)
logging.debug("Task ID: " + str(ret.task_id))
taskHandles[ret.task_id] = ret
except:
logging.info("populateQueue fail: " + str(task.traceback))
logging.info("Tasks queued: " + str(len(taskHandles)))
return taskHandles, r
Часть main (), которая ожидает результатов:
frameCountTotal = getFrameCount(targetFile)
encodeTasks = buildCmdString(targetFile, frameCountTotal, clientCount)
taskHandles, retSet = populateQueue(encodeTasks)
logging.info("Waiting on tasks...")
retSet.join(callback=testCallback)
Заранее спасибо