Я справился с подобными проблемами в Airflow, используя 2 группы доступности базы данных.
DAG1: получение списка файлов JSON для загрузки (локальное хранение в простой среде или установка в NFS или общей точке монтирования в более сложную систему).среда)
DAG2: зацикливание списка файлов для создания задач, специфичных для каждого файла - Загрузить файл JSON - Разобрать файл JSON - Изменить данные JSON - Выполнить набор (без сохранения состояния) функций исправления ошибок для измененных данных JSON - ВставитьДанные JSON в базу данных. Выполнение функций уровня базы данных для этих только что вставленных данных. Выполнение дополнительных функций уровня базы данных для данных, начиная с шага 5
. Вот неполный фрагмент, показывающий, как зацикливаться на файле CSV и генерироватьЗагрузите и проанализируйте файлы JSON как задачи BashOperator.
dlJSON = {}
parseJSON = {}
all_tasks = DummyOperator(task_id='all_tasks',
dag=dag)
with open(file_directory + metadata) as csvfile:
reader = csv.DictReader(csvfile)
rows = [_ for _ in reader if _] # remove empty strings
for row in rows:
dlJSON[('dlJSON_{}'.format(row['file']) ] = BashOperator(
task_id=('dlJSON_{}'.format(row['file'])),
bash_command=templated_download,
xcom_push=True,
params={'file': row['file'],
'directory': file_directory,
'outfile': '{}.json'.format(row['file'])},
dag=dag)
parseJSON[('parseJSON_{}'.format(row['file']) ] = BashOperator(
task_id=('parseJSON_{}'.format(row['file'])),
bash_command=templated_parse,
xcom_push=True,
params={'file': row['file'],
'directory': file_directory,
'infile': '{}.json'.format(row['file'])},
dag=dag)
'dlJSON_{}'.format(row['file']) >> 'parseJSON_{}'.format(row['file']) >> all_tasks