У меня есть 30k + RSS-каналы в моей базе данных.Я использую feedparser для разбора и извлечения новых записей из каждого канала RSS. После того, как я получу список новых записей, мне нужно проверить их и сохранить в БД.В настоящее время я думаю о написании 3-х отдельных задач сельдерея: Задача 1, которая будет анализировать каждый URL-адрес, и везде, где есть новая запись, заполнять объект класса Python.Задача 2 примет объект и проверит его. Если объект действителен, он вернет тот же объект или же вернет и ErrorObject, содержащий исходный объект и подробности ошибки.Задача 3 примет любую из двух и в зависимости от типа объекта сохранит ее в БД
. В основном, когда я запускаю скрипт python, он получает все URL-адреса из БД и затем передает каждый URL-адрес Задаче 1, ответиз которого следует автоматически перейти к Задаче 2, а затем ответ из Задачи 2 должен перейти к Задаче 3. Я не хочу блокировать Задачу 2 / Задачу 3, и они должны автоматически запускаться, как только одна запись обрабатывается из предыдущей задачи.Я использую следующий код для передачи всех URL в задачу 1, но не уверен, как обеспечить, чтобы задача 2 не блокировалась, пока задача 1 все еще обрабатывает 30 000+ URL-адресов
rss_urls = db.session.query(RSSfeed).filter(RSSfeed.rss_url != None).all()
jobs = group(rss_parse.s(item) for item.rss_url in rss_urls)
result = jobs.apply_async()
Я использую Celery с RabbitMQ в качестве брокера
Могу ли я сделать это так?
@celery.task
def rss_parse(rss_url):
feed_entry_obj = get_new_entries_from_rss_feed(rss_url)
task2.apply_async(feed_entry_obj)
@celery.task
def task2(feed_entry_obj):
validation_obj = validate(feed_entry_obj)
task3.apply_async(validation_obj)
@celery.task
def task3(validation_obj):
save_in_db(validation_obj)