Apache Воздушный поток - используйте python результат в следующих шагах - PullRequest
0 голосов
/ 29 мая 2020

Я работаю над простым Apache Airflow DAG. Моя цель:
1. вычислить параметр данных на основе даты запуска DAG - я пытаюсь добиться этого с помощью оператора Python.
2. передать рассчитанный выше параметр как параметр запроса bq.

Любые идеи приветствуются.

Мой код ниже - я пометил две точки, с которыми я борюсь, меткой «TODO».

...

def set_date_param(dag_run_time):
    # a business logic applied here
    ....

    return "2020-05-28" # example result

# --------------------------------------------------------
# DAG definition below
# --------------------------------------------------------

# Python operator
set_data_param = PythonOperator(
  task_id='set_data_param',
  python_callable=set_data_param,
  provide_cotext=True,
  op_kwargs={
    "dag_run_date": #TODO - how to pass the DAG running date as a function input parameter
  },
  dag=dag
)

# bq operator
load_data_to_bq_table = BigQueryOperator(
    task_id='load_data_to_bq_table',
    sql="""SELECT ccustomer_id, sales
    FROM `my_project.dataset1.table1` 
    WHERE date_key = {date_key_param}
    """.format(
       date_key_param = 
   ), #TODO - how to get the python operator results from the previous step
    use_legacy_sql=False,
    destination_dataset_table="my_project.dataset2.table2}",
    trigger_rule='all_success',
    dag=dag
)

set_data_param >> load_data_to_bq_table

1 Ответ

2 голосов
/ 29 мая 2020
  1. Чтобы PythonOperator передавал дату выполнения в python_callable, вам нужно всего лишь установить provide_cotext=True (как это уже было сделано в вашем примере). Таким образом, Airflow автоматически передает коллекцию аргументов ключевого слова вызываемому python, так что имена и значения этих аргументов эквивалентны шаблонным переменным, описанным здесь . То есть, если вы определите вызываемый python как set_data_param(ds, **kwargs): ..., параметр ds автоматически получит дату выполнения в виде строкового значения в формате YYYY-MM-DD.

  2. XCOM позволяет экземплярам задач обмениваться сообщениями. Чтобы использовать дату, возвращаемую set_date_param(), внутри строки запроса sql BigQueryOperator, вы можете объединить XCOM с шаблоном Jinja :

sql="""SELECT ccustomer_id, sales
    FROM `my_project.dataset1.table1` 
    WHERE date_key = {{ task_instance.xcom_pull(task_ids='set_data_param') }}
    """

В следующем полном примере собраны все части вместе. В этом примере задача get_date создает строку даты на основе даты выполнения. После этого задача use_date использует шаблоны XCOM и Jinja для извлечения строки даты и записи ее в журнал.

import logging
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

default_args = {'start_date': days_ago(1)}


def calculate_date(ds, execution_date, **kwargs):
    return f'{ds} ({execution_date.strftime("%m/%d/%Y")})'


def log_date(date_string):
    logging.info(date_string)


with DAG(
    'a_dag',
    schedule_interval='*/5 * * * *',
    default_args=default_args,
    catchup=False,
) as dag:
    get_date = PythonOperator(
        task_id='get_date',
        python_callable=calculate_date,
        provide_context=True,
    )
    use_date = PythonOperator(
        task_id='use_date',
        python_callable=log_date,
        op_args=['Date: {{ task_instance.xcom_pull(task_ids="get_date") }}'],
    )
get_date >> use_date
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...