Моделирование процессов разветвления с помощью Apache Airflow - PullRequest
0 голосов
/ 04 октября 2018

У меня есть ряд задач Airflow, которые требуют некоторого разветвления и сбора результатов для массовой обработки, и мне трудно представить, как это должно работать.

Грубо говоря, я получаю списокфайлов, обработайте их индивидуально с помощью ряда задач преобразования, а затем загрузите их в базу данных.

Обзор задач выборки

  1. Выбор списка файлов JSON для загрузки
  2. Загрузка каждого файла JSON
  3. Для каждого файла начинается «обработка рабочего процесса»

Обзор задач (задач) «Обработка рабочего процесса»

  1. АнализФайл JSON
  2. Изменение формы данных JSON
  3. Запуск набора (без сохранения состояния) функций исправления ошибок для преобразованных данных JSON
  4. Вставка данных JSON в базу данных
  5. Запуск комплекта БД-уровневые функции для этих только что вставленных данных
  6. Запуск дополнительных функций уровня данных для данных с шага 5

Неясно, например, как начать весь «рабочий процесс обработки»"задачи для каждого файла изВ одной задаче.Должны ли такие объемные задачи быть подгруппой задач?Как бы вы смоделировали это?

1 Ответ

0 голосов
/ 05 октября 2018

Я справился с подобными проблемами в Airflow, используя 2 группы доступности базы данных.

DAG1: получение списка файлов JSON для загрузки (локальное хранение в простой среде или установка в NFS или общей точке монтирования в более сложную систему).среда)

DAG2: зацикливание списка файлов для создания задач, специфичных для каждого файла - Загрузить файл JSON - Разобрать файл JSON - Изменить данные JSON - Выполнить набор (без сохранения состояния) функций исправления ошибок для измененных данных JSON - ВставитьДанные JSON в базу данных. Выполнение функций уровня базы данных для этих только что вставленных данных. Выполнение дополнительных функций уровня базы данных для данных, начиная с шага 5

. Вот неполный фрагмент, показывающий, как зацикливаться на файле CSV и генерироватьЗагрузите и проанализируйте файлы JSON как задачи BashOperator.

dlJSON = {}
parseJSON = {}

all_tasks = DummyOperator(task_id='all_tasks', 
                          dag=dag)

with open(file_directory + metadata) as csvfile:
    reader = csv.DictReader(csvfile)
    rows = [_ for _ in reader if _]  # remove empty strings

    for row in rows:

        dlJSON[('dlJSON_{}'.format(row['file']) ] = BashOperator(
            task_id=('dlJSON_{}'.format(row['file'])),
            bash_command=templated_download,
            xcom_push=True,
            params={'file': row['file'],
                    'directory': file_directory,
                    'outfile': '{}.json'.format(row['file'])},
            dag=dag)

        parseJSON[('parseJSON_{}'.format(row['file']) ] = BashOperator(
            task_id=('parseJSON_{}'.format(row['file'])),
            bash_command=templated_parse,
            xcom_push=True,
            params={'file': row['file'],
                    'directory': file_directory,
                    'infile': '{}.json'.format(row['file'])},
                dag=dag)

        'dlJSON_{}'.format(row['file']) >> 'parseJSON_{}'.format(row['file']) >> all_tasks
...