Обновлять dag воздушного потока ежедневно / периодически на основе кода определения файла dagfile. - PullRequest
0 голосов
/ 25 октября 2019

Есть ли способ ежедневно обновлять dag воздушного потока на основе кода определения файла dagfile? Например. обновить значения даты, которые могут использоваться в определении dag.

Для контекста: у меня есть dag воздушного потока, который ежедневно получает новые строки таблицы из удаленной базы данных и перемещает их в локальную базу данных. Для того, чтобы получить самые последние строки из удаленного, у нас есть функция, которая получает последнюю дату из локального. В настоящее время даг определен как ...

...
def get_latest_date(tablename):
    # get latest import date from local table
    ....

for table in tables: # type list(dict)

    task_1 = BashOperator(
        task_id='task_1_%s' % table["tablename"],
        bash_command='bash %s/task_1.sh %s' % (PROJECT_HOME, table["latest_date"]),
        execution_timeout=timedelta(minutes=30),
        dag=dag)

    task_2 = BashOperator(
        task_id='task_2_%s' % table["tablename"],
        bash_command='bash %s/task_2.sh' % PROJECT_HOME,
        execution_timeout=timedelta(minutes=30),
        dag=dag)

    task_1 >> task_2

, где таблицы - это дикты, где одно из их полей построено ранее в коде, чтобы быть строковым представлением самой последней даты для данной таблицы. При печати предполагаемой последней даты в сценарии task_1.sh, обнаруживается, что дата не обновляется каждый день. Нужен способ, чтобы список таблиц создавался заново каждый день, чтобы иметь правильные значения даты.

1 Ответ

0 голосов
/ 29 октября 2019

Используя приведенный ниже код, вы можете динамически извлекать latest_date из вашей локальной БД для каждой таблицы и использовать ее в своем BashOperator, используя Airflow XCom .

from airflow import DAG
import airflow
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.bash_operator import BashOperator
import logging

from datetime import datetime, timedelta

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}

dag = DAG(
    dag_id='example_dag',
    default_args=args,
    schedule_interval=None,
)


def get_latest_date(**kwargs):
    # get latest import date from local table
    logging.info("Table Name: {0}".format(kwargs['table_name']))
    # below i am doing a datetime.today() for demonstration. In your function, it will be your actual logic to get the latest date from your local DB
    latest_date = (datetime.today() - timedelta(days=kwargs['date_diff'])).strftime('%d-%m-%Y')
    logging.info("Latest Date: {0}".format(latest_date))
    #pus the latest date to the task xcom
    kwargs['ti'].xcom_push(key='latest_date', value=latest_date)

    return latest_date

start_task = DummyOperator(task_id='Start_Task', dag=dag)
end_task = DummyOperator(task_id='End_Task', dag=dag)

# below list will no longer require latest_date entry in each of the table dictionary 
tables_list = [{'tablename': 'table1'}, {'tablename': 'table2'}, {'tablename': 'table3'}, {'tablename': 'table4'}]
# below i am using idx (index) for date difference. I am doing a date difference to get difference latest_date values for different tasks. This is just for demonstration purpose
for idx, table in enumerate(tables_list): # type list(dict)

    get_latest_date_task = ShortCircuitOperator(
        task_id='Get_Latest_Date_In_Table_{0}'.format(table['tablename']),
        provide_context=True,
        python_callable=get_latest_date,
        op_kwargs={
            'table_name': table['tablename'],
            'date_diff': idx
        },
        dag=dag)

    # you can create a variable xcom_str like below and use that xcom_str in BashOperator bash_command or you can directly embed that in bash_command (like I did in task_2 BashOperator)
    xcom_str = "{{ ti.xcom_pull(task_ids='Get_Latest_Date_In_Table_{}', key='latest_date') }}".format(table['tablename'])
    task_1 = BashOperator(
        task_id='task_1_{0}'.format(table['tablename']),
        bash_command='echo "{' + xcom_str + '}"',                
        execution_timeout=timedelta(minutes=30),
        dag=dag)

    task_2 = BashOperator(
        task_id='task_2_{0}'.format(table['tablename']),
        bash_command='echo "{{ ti.xcom_pull("Get_Latest_Date_In_Table_' + table['tablename'] + '", key="latest_date") }}"',
        execution_timeout=timedelta(minutes=30),
        dag=dag)

    start_task >> get_latest_date_task >> task_1 >> task_2 >> end_task
...