Передача данных между компонентами в Airflow - PullRequest
0 голосов
/ 05 апреля 2019

Я очень новичок в Airflow и прочитал большую часть его документации. Из документации я понимаю, что небольшие данные между компонентами в группе обеспечения доступности баз данных могут совместно использоваться с помощью класса XCom. Компонент в группе обеспечения доступности баз данных, публикующий данные, должен выдвинуть, а компонент, подписывающийся на данные, должен выдвинуть.

Тем не менее, я не очень хорошо разбираюсь в синтаксической части толкания и вытягивания. Я имею в виду раздел XCom по документации и разработал шаблон кода. Предположим, у меня есть следующий код, который имеет только два компонента: толкатель и пуллер. Толкатель публикует текущее время, которое съемщик должен использовать, и записывает его в файл журнала.

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

log_file_location = '/usr/local/airflow/logs/time_log.log'

default_args = {'owner':'apache'}
dag = DAG('pushpull', default_args = default_args)

def push_function():
    #push this data on the DAG as key-value pair
    return(datetime.now()) #current time

def pull_function():
    with open(log_file_location, 'a') as logfile:
        current_time = '' #pull data from the pusher as key - value pair
        logfile.writelines('current time = '+current_time)
    logfile.close()

with dag:
    t1 = PythonOperator(
        task_id = 'pusher', 
        python_callable = push_function)

    t2 = PythonOperator(
        task_id = 'puller', 
        python_callable = pull_function)

    t2.set_upstream(t1)

Мне нужна помощь мастеров Airflow по двум синтаксисам:

  1. Как передать данные из функции push вместе с клавишей
  2. Как получить функцию извлечения данных, используя данные ключа.

Спасибо!

1 Ответ

1 голос
/ 05 апреля 2019

Пример нажатия на Xcom с помощью клавиши:

def push_function(**context):
    msg='the_message'
    print("message to push: '%s'" % msg)
    task_instance = context['task_instance']
    task_instance.xcom_push(key="the_message", value=msg)

Пример перехода на Xcom с помощью клавиши:

def pull_function(**kwargs):
    ti = kwargs['ti']
    msg = ti.xcom_pull(task_ids='push_task',key='the_message')
    print("received message: '%s'" % msg)

Пример DAG:

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

DAG = DAG(
  dag_id='simple_xcom',
  start_date=datetime(2017, 10, 26),
  schedule_interval=timedelta(1)
)

def push_function(**context):
    msg='the_message'
    print("message to push: '%s'" % msg)
    task_instance = context['task_instance']
    task_instance.xcom_push(key="the_message", value=msg)

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

def pull_function(**kwargs):
    ti = kwargs['ti']
    msg = ti.xcom_pull(task_ids='push_task',key='the_message')
    print("received message: '%s'" % msg)

pull_task = PythonOperator(
    task_id='pull_task', 
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

push_task >> pull_task
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...