Как я могу асинхронно получать обработанные сообщения в Celery? - PullRequest
0 голосов
/ 28 сентября 2018

Я пишу конвейер обработки данных с использованием Celery, потому что это значительно ускоряет процесс.

Рассмотрим следующий псевдокод:


    from celery.result import ResultSet
    from some_celery_app import processing_task # of type @app.task

    def crunch_data():
        results = ResultSet([])
        for document in mongo.find(): #Around 100K - 1M documents
            job = processing_task.delay(document)
            results.add(job)

        return results.get()

    collected_data = crunch_data()
    #Do some stuff with this collected data

Я успешно породил четырех рабочих с включенным параллелизмоми когда я запускаю этот скрипт, данные обрабатываются соответствующим образом, и я могу делать все, что захочу.

Я использую RabbitMQ в качестве посредника сообщений и rpc в качестве бэкэнда.

Что я вижукогда я открываю пользовательский интерфейс управления RabbitMQ:

  1. Сначала обрабатываются все документы
  2. , а затем и только тогда - документы, полученные коллективным вызовом results.get().

Мой вопрос: есть ли способ выполнить обработку и последующий поиск одновременно?В моем случае, поскольку все документы являются атомарными объектами, которые не зависят друг от друга, похоже, нет необходимости ждать полной обработки задания.

1 Ответ

0 голосов
/ 02 октября 2018

Вы можете попробовать параметр обратного вызова в ResultSet.get(callback=cbResult), а затем обработать результат в обратном вызове.

def cbResult(task_id, value):
  print(value)
results.get(callback=cbResult)
...