Неблокирующая цепочка сельдерея - PullRequest
0 голосов
/ 19 октября 2018

У меня есть 30k + RSS-каналы в моей базе данных.Я использую feedparser для разбора и извлечения новых записей из каждого канала RSS. После того, как я получу список новых записей, мне нужно проверить их и сохранить в БД.В настоящее время я думаю о написании 3-х отдельных задач сельдерея: Задача 1, которая будет анализировать каждый URL-адрес, и везде, где есть новая запись, заполнять объект класса Python.Задача 2 примет объект и проверит его. Если объект действителен, он вернет тот же объект или же вернет и ErrorObject, содержащий исходный объект и подробности ошибки.Задача 3 примет любую из двух и в зависимости от типа объекта сохранит ее в БД

. В основном, когда я запускаю скрипт python, он получает все URL-адреса из БД и затем передает каждый URL-адрес Задаче 1, ответиз которого следует автоматически перейти к Задаче 2, а затем ответ из Задачи 2 должен перейти к Задаче 3. Я не хочу блокировать Задачу 2 / Задачу 3, и они должны автоматически запускаться, как только одна запись обрабатывается из предыдущей задачи.Я использую следующий код для передачи всех URL в задачу 1, но не уверен, как обеспечить, чтобы задача 2 не блокировалась, пока задача 1 все еще обрабатывает 30 000+ URL-адресов

rss_urls = db.session.query(RSSfeed).filter(RSSfeed.rss_url != None).all()
jobs = group(rss_parse.s(item) for item.rss_url in rss_urls)
result = jobs.apply_async()

Я использую Celery с RabbitMQ в качестве брокера

Могу ли я сделать это так?

@celery.task
def rss_parse(rss_url):
  feed_entry_obj = get_new_entries_from_rss_feed(rss_url)
  task2.apply_async(feed_entry_obj)

@celery.task
def task2(feed_entry_obj):
  validation_obj = validate(feed_entry_obj)
  task3.apply_async(validation_obj)

@celery.task
def task3(validation_obj):
  save_in_db(validation_obj)
...