Обработка списка миллионов файлов в Airflow параллельно - PullRequest
1 голос
/ 24 апреля 2019

У меня есть конвейер ETL, который состоит из следующих задач:

  1. сканирование веб-сайта для сбора URL-адресов в CSV-файлы
  2. загрузка всех файлов из URL-адресов шага 1
  3. обрабатывать каждый файл, загруженный таким образом в Шаге-2

У меня есть функция python для каждого из этих шагов, давайте назовем их {f1,f2,f3}

С multiprocessing модуль и Pool.map вызов, я могу запустить n processes для обоих f2, f3

Я конвертировал весь этот сценарий в DAG Airflow.Мой вопрос предполагает, что это масштаб в миллион файлов, и я хотел бы, особенно в Шаге 3, использовать весь мой кластер работников Celery для параллельной обработки файлов.

Например, скажем, у меня есть 100 рабочих узлов сельдерея, каждый из которых имеет 2 ядра, что дает мне 200 ядер - я бы хотел обработать, скажем, не менее 100 файлов параллельно.

Как мне это сделать?

1 Ответ

0 голосов
/ 24 апреля 2019

Воздушный поток может использоваться в тысячах динамических задач, но не должен. Предполагается, что DAG воздушного потока должны быть довольно постоянными. Например, вы все еще можете использовать Airflow, чтобы обрабатывать весь набор очищенных данных и позже использовать эту информацию в процессе ETL.

Большое количество динамических задач может привести к запуску DAG, например:

enter image description here

Что приводит к большому количеству информации о мусоре как в графическом интерфейсе, так и в файлах журналов.

Я рекомендую построить вашу систему задач поверх библиотеки Celery (не путайте с CeleryExecutor в Airflow, потому что Airflow можно использовать поверх Celery). Это очередь задач, ориентированная на миллионы задач реального времени:

Сельдерей используется в производственных системах для обработки миллионов задач в день.

Celery написан на Python, готов к работе, стабилен и невероятно масштабируем. Я думаю, что это лучший инструмент для решения вашей проблемы.


Но если вы действительно хотите использовать только Airflow, вы можете прочитать эту статью (о динамическом создании DAG) и эту статью (о динамическом создании задач внутри DAG).

...