Как динамически генерировать входные данные для задачи «Сельдерейский аккорд» - PullRequest
0 голосов
/ 13 апреля 2020

Я использую сельдерей для обработки документов, используя рабочий процесс, который мне удалось смоделировать как задачу сокращения карты, см. Схему c ниже для обзора. В большинстве примеров в документации показано, как использовать группы и аккорды для реализации рабочих процессов, но во всех этих примерах шаг «генератора» является дешевым и может выполняться синхронно [ link ].

В моем случае это очень дорогая операция, которую я предпочел бы выполнить в фоновом режиме.

Вот небольшой фрагмент, иллюстрирующий мою настройку - ссылка на репо со связанной docker - композицией для настройки rabbitmq / postgres [ здесь ]:

# tasks.py

import os
import time
import celery
from celery import group, chord
from celery.utils.log import get_task_logger


app = celery.Celery(
    __name__,
    broker="amqp://{user}:{password}@{host}".format(
        user="guest", password="guest", host=os.getenv("RABBIT", "localhost"), vhost=""
    ),
    backend="db+postgresql+psycopg2://{user}:{password}@{host}/{database}".format(
        user="postgres",
        password="postgres",
        host=os.getenv("PGHOST", "localhost"),
        database="postgres",
    ),
)

logger = get_task_logger(__name__)


@app.task()
def gen(n):
    """ Expensive generator function """
    time.sleep(10)
    return list(range(n))


@app.task()
def multiply(x):
    """ Function used by the mapper """
    return x * x


@app.task()
def reducer(numbers):
    """ Simple reduce function """
    time.sleep(10)
    return sum(numbers)


@app.task()
def map_reduce(n):
    """ Takes input that dynamically produces a generator that in turn, produce
    input to a map-reduce job """

    numbers = gen.s(n)
    my_chord = chord(group(multiply.s(n) for n in numbers()), reducer.s())
    result = my_chord()

    return result

В идеале я хотел бы иметь возможность асинхронно вызывать функцию map_reduce и возвращать результат аккорда в той же задаче , но самое близкое, что я получил, это анализ задачи, созданной задачей синтаксического анализа:

>>> from tasks import map_reduce
>>> task = map_reduce.apply_async(args=(10,))
>>> task.get()
(('dc2c36d4-c4ca-4032-a1a2-a4c90d78d7fe',
  (('805d55fa-03a2-43f1-884c-97f58241bf3b', None),
   [(('da0cb921-914a-4e5b-bc8b-b8c5dd6df050', None), None),
    (('3f808f4b-777f-45bf-86a9-b3a5747d0d55', None), None),
    (('fa3e15ee-7aa6-44ca-8138-664d332d5d63', None), None),
    (('5c349280-9e0c-40bb-94b5-057605cf5c56', None), None),
    (('88a1f18d-83fe-40c5-bbd1-00cc781ca7be', None), None),
    (('1b027655-3065-4e15-aa46-8cb3e1f8afaf', None), None),
    (('7813cab2-cbd4-4fe9-b7e2-8f0bf5399a32', None), None),
    (('e1539d2a-2bd5-4df3-83a3-b11500c610c4', None), None),
    (('919c52a8-102b-4ceb-89fb-bcbc7f1c98ab', None), None),
    (('19fcfd0d-3799-4f97-8c29-86451a9da3f5', None), None)])),
 None)
>>> celery.result.AsyncResult(task.get()[0][0]).get()
285

Есть ли лучший способ связать две задачи вместе, чтобы возвращаемый результат из map_reduce был результатом аккорда на основе динамически генерируемого входа?

Schematic workflow

1 Ответ

0 голосов
/ 13 апреля 2020

вы можете использовать класс AsyncResult https://docs.celeryproject.org/en/stable/reference/celery.result.html

task_id = task.get()[0][0]
print(celery.result.AsyncResult(task_id).get())
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...