Я пишу конвейер обработки данных с использованием Celery, потому что это значительно ускоряет процесс.
Рассмотрим следующий псевдокод:
from celery.result import ResultSet
from some_celery_app import processing_task # of type @app.task
def crunch_data():
results = ResultSet([])
for document in mongo.find(): #Around 100K - 1M documents
job = processing_task.delay(document)
results.add(job)
return results.get()
collected_data = crunch_data()
#Do some stuff with this collected data
Я успешно породил четырех рабочих с включенным параллелизмоми когда я запускаю этот скрипт, данные обрабатываются соответствующим образом, и я могу делать все, что захочу.
Я использую RabbitMQ в качестве посредника сообщений и rpc
в качестве бэкэнда.
Что я вижукогда я открываю пользовательский интерфейс управления RabbitMQ:
- Сначала обрабатываются все документы
- , а затем и только тогда - документы, полученные коллективным вызовом
results.get()
.
Мой вопрос: есть ли способ выполнить обработку и последующий поиск одновременно?В моем случае, поскольку все документы являются атомарными объектами, которые не зависят друг от друга, похоже, нет необходимости ждать полной обработки задания.