Apache airflow, автоматическая дата обратной засыпки - PullRequest
0 голосов
/ 08 мая 2018

У меня есть группа доступности базы данных, которую я хочу использовать для обратной заполнения таблицы базы данных.

from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 4, 1),
    'retry_delay': timedelta(minutes=1),
}

dag = DAG(dag_id='airflow_backfill', default_args=args, schedule_interval='@daily')

"""
    Task for inserting data per day
"""

task1 = PostgresOperator(
    task_id='insert_new_row',
    postgres_conn_id='aws_pg',
    sql="INSERT INTO airflow_test(date_at) VALUES('2018-04-01')",
    dag=dag,
)

task2 = PostgresOperator(
    task_id='update_team_name',
    postgres_conn_id='aws_pg',
    sql="UPDATE airflow_test SET team_name = (SELECT team_name FROM teams ORDER BY RANDOM() LIMIT 1) WHERE team_name is NULL",
    dag=dag,
)

task1.set_downstream(task2)

Я вставляю одну строку в базу данных с 1 апреля 2018 года, но проблема в том, что я даю date_atпеременная жестко запрограммирована.

Мой вопрос, есть ли способ, которым я могу дать дату обратной засыпки в качестве значения вставки?Я хочу установить значение date_at автоматически при выполнении обратной засыпки, но не нашел никакой переменной среды / конфигурации воздушного потока, из которой я могу автоматически получить дату обратной засыпки.

Я использую apache airflow 1.9.0,Спасибо.

1 Ответ

0 голосов
/ 08 мая 2018

РЕДАКТИРОВАНИЕ: Вы должны иметь возможность использовать шаблон jinja, чтобы получить переменную execute_date:

task1 = PostgresOperator(
    task_id='insert_new_row',
    postgres_conn_id='aws_pg',
    sql="INSERT INTO airflow_test(date_at) VALUES('{{ ds }}')",
    dag=dag,
)

https://airflow.apache.org/code.html#default-variables

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...