У меня есть 3 задачи для выполнения в том же дагс. В то время как Задача1 возвращает список словарей Задача2 и Задача3 пытаются использовать один элемент словаря из результата, возвращаемого
задание 1.
def get_list():
....
return listOfDict
def parse_1(example_dict):
...
def parse_2(example_dict):
...
dag = DAG('dagexample', default_args=default_args)
data_list = PythonOperator(
task_id='get_lists',
python_callable=get_list,
dag=dag)
for data in data_list:
sub_task1 = PythonOperator(
task_id='data_parse1' + data['id'],
python_callable=parse_1,
op_kwargs={'dataObject': data},
dag=dag,
)
sub_task2 = PythonOperator(
task_id='data_parse2' + data['id'],
python_callable=parse_2,
op_kwargs={'dataObject': data},
dag=dag,
)