Между запусками DAG вы не должны изменять структуру DAG, поэтому ваш съемник - это либо одна задача, предназначенная для извлечения всех значений, либо 10 задач, каждая из которых предназначена для извлечения одного из значений.
Вот как вы бы добавили все 10 значений с помощью xcom:
def push(**kwargs):
# """Pushes an XCom without a specific target"""
final_output = []
for n in range(10):
# doing work
final_output.append(n)
kwargs['ti'].xcom_push(key=f'vals', value=final_output)
push = python_operator.PythonOperator(
task_id='push',
python_callable=push,
provide_context=True
)
И затем вы можете либо вытянуть все 10 из них следующим образом
def puller(**kwargs):
ti = kwargs['ti']
v1 = ti.xcom_pull(key='vals', task_ids='push')
print(v1) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
puller = python_operator.PythonOperator(
task_id='puller',
python_callable=puller,
provide_context=True
)
Или по одному значению для каждого издесять заданий:
def puller(index=0, **kwargs):
ti = kwargs['ti']
v1 = ti.xcom_pull(key='vals', task_ids='push')[index]
print(v1)
ten_ops = [python_operator.PythonOperator(
task_id=f'puller_{n}',
python_callable=puller,
provide_context=True,
op_kwargs={'index': n},
) for n in range(10)]
Надеюсь, это поможет, если только я неправильно понял вопрос.