Apache Воздушный поток: как динамически запускать несколько групп DAG? - PullRequest
0 голосов
/ 17 июня 2020

У меня есть 2 DAG: 1. DAG1 - бегун 2. DAG2 - конвейер

Мне нужно получить некоторую JSON конечную точку в DAG1, получить массив из N элементов response.data.items и запустить DAG2 для каждого каждый элемент с пройденным item.somedata

Как это сделать?

обн. Пробовал

dag = DAG(
    'fetch-1',
    default_args=default_args,
    description='Fetching emails',
    schedule_interval=timedelta(days=1),
)

t1 = DummyOperator(
  task_id='start',
  dag=dag,
)

r = requests.post('http://host.docker.internal:8080/fetch-emails')
j = r.json()


for _, msg in j.data.messages:
  tx = DummyOperator(
    task_id='email_pipeline_{}'.format(msg.id),
    dag=dag,
  )
  t1 >> tx

1 Ответ

0 голосов
/ 17 июня 2020

вы пытаетесь запустить даг, используя промежуточные результаты этого дага для создания самого себя. это невозможно.

вместо этого вы можете из dag1 запускать несколько запусков dag2 с использованием конечной точки dag_runs, как описано здесь .

...