У меня есть 2 DAG: 1. DAG1 - бегун 2. DAG2 - конвейер
Мне нужно получить некоторую JSON конечную точку в DAG1, получить массив из N элементов response.data.items
и запустить DAG2 для каждого каждый элемент с пройденным item.somedata
Как это сделать?
обн. Пробовал
dag = DAG(
'fetch-1',
default_args=default_args,
description='Fetching emails',
schedule_interval=timedelta(days=1),
)
t1 = DummyOperator(
task_id='start',
dag=dag,
)
r = requests.post('http://host.docker.internal:8080/fetch-emails')
j = r.json()
for _, msg in j.data.messages:
tx = DummyOperator(
task_id='email_pipeline_{}'.format(msg.id),
dag=dag,
)
t1 >> tx