Совместное использование между операторами в DAG - PullRequest
0 голосов
/ 10 февраля 2020

Скажем, у меня есть DAG с dag_id в качестве run_id и конвейер T1> T2> T3, где T1, T2, T3 Python операторы.

Я хочу иметь возможность передавать параметры из T1 в T2 без сохранения их в базе данных / S3 и чтения их обратно в T2, так как это долго.

Я знаю, что если T1 дает сбой, то T2 не будет выполняться, и, посмотрев на это, я увидел, что решение основано на коде выхода (0/1), поэтому, похоже, нет способа передать параметры через ,

Кто-нибудь знает, могу ли я передать параметры / вывод следующему оператору без чтения / записи извне? Есть ли примеры такого, как я не смог найти ни одного.

1 Ответ

0 голосов
/ 11 февраля 2020

Как указано в документации по воздушному потоку, вы можете использовать Xcoms. XComs позволяет задачам обмениваться сообщениями, предоставляя более тонкие формы контроля и общего состояния. Название является аббревиатурой «кросс-коммуникация». XComs в основном определяются ключом, значением и временной меткой, но также отслеживают такие атрибуты, как задача / группа обеспечения доступности баз данных, которая создала XCom и когда он должен стать видимым. Любой объект, который можно протравить, можно использовать в качестве значения XCom, поэтому пользователи должны обязательно использовать объекты соответствующего размера. Проверьте эту ссылку: https://airflow.apache.org/docs/stable/concepts.html?highlight=xcom#xcoms

Скажите, что T1 имеет идентификатор задачи t1 и вызывает python function fun c.

def func():
    return some_value

Вы можете получить возвращаемое значение fun c, используя xcom. Скажем, T2 вызывает некоторую python функцию sample_function . Затем вы можете получить fun c возвращаемое значение как

def sample_function(**context):
    value = context['task_instance'].xcom_pull(task_ids='t1')

Обратите внимание, вам нужно передать контекст в вашей функции, это можно сделать, упомянув provide_context=True в операторе.

...