Создание нескольких задач в Airflow DAG для индивидуальной обработки - PullRequest
0 голосов
/ 25 февраля 2019

В моей DAG есть части, которые генерируют списки, которые я не могу разбить на отдельные задачи, которые будут обрабатываться по отдельности.

Вот псевдо-пример:

def push(**kwargs):
    # """Pushes an XCom without a specific target"""
    for n in range(10):
        kwargs['ti'].xcom_push(key=f'vals', value=n)

def puller(**kwargs):
    ti = kwargs['ti']
    v1 = ti.xcom_pull(key='vals', task_ids='push')
    print(v1)

push = python_operator.PythonOperator(
    task_id='push',
    python_callable=push,
    provide_context=True
)

puller = python_operator.PythonOperator(
    task_id='puller',
    python_callable=puller,
    provide_context=True
)

Похоже, xcom_push использует только последнее значение, а не генерирует список.Поэтому мне пришлось бы загружать значения в push в список, а затем использовать цикл for в pull для обработки каждого элемента в отдельности.

Я вполне справляюсь с этим, но это кажется нелогичным при выполнении пакетных заданий.

Как бы я мог заставить пуллер тянуть одну из 10 задач, генерируемых push?

1 Ответ

0 голосов
/ 25 февраля 2019

Между запусками DAG вы не должны изменять структуру DAG, поэтому ваш съемник - это либо одна задача, предназначенная для извлечения всех значений, либо 10 задач, каждая из которых предназначена для извлечения одного из значений.

Вот как вы бы добавили все 10 значений с помощью xcom:

def push(**kwargs):
    # """Pushes an XCom without a specific target"""
    final_output = []
    for n in range(10):
        # doing work
        final_output.append(n)
    kwargs['ti'].xcom_push(key=f'vals', value=final_output)

push = python_operator.PythonOperator(
    task_id='push',
    python_callable=push,
    provide_context=True
)

И затем вы можете либо вытянуть все 10 из них следующим образом

def puller(**kwargs):
    ti = kwargs['ti']
    v1 = ti.xcom_pull(key='vals', task_ids='push')
    print(v1)  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

puller = python_operator.PythonOperator(
    task_id='puller',
    python_callable=puller,
    provide_context=True
)

Или по одному значению для каждого издесять заданий:

def puller(index=0, **kwargs):
    ti = kwargs['ti']
    v1 = ti.xcom_pull(key='vals', task_ids='push')[index]
    print(v1)

ten_ops = [python_operator.PythonOperator(
        task_id=f'puller_{n}',
        python_callable=puller,
        provide_context=True,
        op_kwargs={'index': n},
    ) for n in range(10)]

Надеюсь, это поможет, если только я неправильно понял вопрос.

...