Требование: Создание настраиваемой функции даты для использования в операторах, DAG и т. Д. c
Ниже находится файл DAG
DAG
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from alerts.custom_date import strt_of_wk_strt_mon_dt, NEXT_DS_NODASH
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 7, 8),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(dag_id = 'test_date',
schedule_interval='@once',
default_args=default_args)
def test(**kwargs):
first_date = kwargs.get('execution_date', None)
strt_wk_dt = kwargs.get('strt_wk_dt')
next_ds_nodash = kwargs.get('next_ds_nodash')
s3_key = kwargs.get('s3_key')
print(f'EXECUTION DATE:{first_date}')
print(f'STRT_WK_DT:{strt_wk_dt}')
print(f'NEXT_DS_NODASH:{next_ds_nodash}')
print(f'S#_KEY:{s3_key}')
with dag:
execution_date = '{{ execution_date }}'
next_ds_nodash = NEXT_DS_NODASH
strt_wk_dt = strt_of_wk_strt_mon_dt()
t1 = PythonOperator(
task_id='show_template',
python_callable=test,
op_kwargs={'execution_date': execution_date,
'next_ds_nodash': next_ds_nodash,
'strt_wk_dt': strt_wk_dt,
's3_key':f'snowflakes/FEEDS/{strt_wk_dt}/abc_{strt_wk_dt}.csv'},
provide_context=True)
С пакетом datetime Сначала я попытался использовать библиотеку DateTime, и она отлично работала, как показано на скриншоте ниже
Ниже cstm_date.py
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
NEXT_DS_NODASH = '{{ (execution_date + macros.timedelta(days=1)).strftime("%m%d%Y") }}'
def strt_of_wk_strt_mon_dt():
return (datetime.today().date() - timedelta(days=datetime.today().weekday())).strftime('%Y_%m_%d')
Результат печатается, как показано ниже
Next, I tried with using pendulum library, the output is not printing the date value
** Pendulum package**
Below is the cstm_date.py
import pendulum
import logging
logger = logging.getLogger(__name__)
NEXT_DS_NODASH = '{{ (execution_date + macros.timedelta(days=1)).strftime("%m%d%Y") }}'
def strt_of_wk_strt_mon_dt():
today = pendulum.now()
return today.start_of('week').format('YYYY_MM_DD')
На выходе не печатается значение STRT_WK_DT
введите описание изображения здесь
Что мне не хватает?