Я использую сельдерей для обработки документов, используя рабочий процесс, который мне удалось смоделировать как задачу сокращения карты, см. Схему c ниже для обзора. В большинстве примеров в документации показано, как использовать группы и аккорды для реализации рабочих процессов, но во всех этих примерах шаг «генератора» является дешевым и может выполняться синхронно [ link ].
В моем случае это очень дорогая операция, которую я предпочел бы выполнить в фоновом режиме.
Вот небольшой фрагмент, иллюстрирующий мою настройку - ссылка на репо со связанной docker - композицией для настройки rabbitmq / postgres [ здесь ]:
# tasks.py
import os
import time
import celery
from celery import group, chord
from celery.utils.log import get_task_logger
app = celery.Celery(
__name__,
broker="amqp://{user}:{password}@{host}".format(
user="guest", password="guest", host=os.getenv("RABBIT", "localhost"), vhost=""
),
backend="db+postgresql+psycopg2://{user}:{password}@{host}/{database}".format(
user="postgres",
password="postgres",
host=os.getenv("PGHOST", "localhost"),
database="postgres",
),
)
logger = get_task_logger(__name__)
@app.task()
def gen(n):
""" Expensive generator function """
time.sleep(10)
return list(range(n))
@app.task()
def multiply(x):
""" Function used by the mapper """
return x * x
@app.task()
def reducer(numbers):
""" Simple reduce function """
time.sleep(10)
return sum(numbers)
@app.task()
def map_reduce(n):
""" Takes input that dynamically produces a generator that in turn, produce
input to a map-reduce job """
numbers = gen.s(n)
my_chord = chord(group(multiply.s(n) for n in numbers()), reducer.s())
result = my_chord()
return result
В идеале я хотел бы иметь возможность асинхронно вызывать функцию map_reduce
и возвращать результат аккорда в той же задаче , но самое близкое, что я получил, это анализ задачи, созданной задачей синтаксического анализа:
>>> from tasks import map_reduce
>>> task = map_reduce.apply_async(args=(10,))
>>> task.get()
(('dc2c36d4-c4ca-4032-a1a2-a4c90d78d7fe',
(('805d55fa-03a2-43f1-884c-97f58241bf3b', None),
[(('da0cb921-914a-4e5b-bc8b-b8c5dd6df050', None), None),
(('3f808f4b-777f-45bf-86a9-b3a5747d0d55', None), None),
(('fa3e15ee-7aa6-44ca-8138-664d332d5d63', None), None),
(('5c349280-9e0c-40bb-94b5-057605cf5c56', None), None),
(('88a1f18d-83fe-40c5-bbd1-00cc781ca7be', None), None),
(('1b027655-3065-4e15-aa46-8cb3e1f8afaf', None), None),
(('7813cab2-cbd4-4fe9-b7e2-8f0bf5399a32', None), None),
(('e1539d2a-2bd5-4df3-83a3-b11500c610c4', None), None),
(('919c52a8-102b-4ceb-89fb-bcbc7f1c98ab', None), None),
(('19fcfd0d-3799-4f97-8c29-86451a9da3f5', None), None)])),
None)
>>> celery.result.AsyncResult(task.get()[0][0]).get()
285
Есть ли лучший способ связать две задачи вместе, чтобы возвращаемый результат из map_reduce был результатом аккорда на основе динамически генерируемого входа?