Воздушный поток: пользовательские даты: отлично работает с датой и временем, но не с маятником - PullRequest
0 голосов
/ 10 июля 2020

Требование: Создание настраиваемой функции даты для использования в операторах, DAG и т. Д. c

Ниже находится файл DAG

DAG

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from alerts.custom_date import strt_of_wk_strt_mon_dt, NEXT_DS_NODASH

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 7, 8),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(dag_id = 'test_date',
        schedule_interval='@once',
        default_args=default_args)


def test(**kwargs):

    first_date = kwargs.get('execution_date', None)
    strt_wk_dt = kwargs.get('strt_wk_dt')
    next_ds_nodash = kwargs.get('next_ds_nodash')
    s3_key = kwargs.get('s3_key')

    print(f'EXECUTION DATE:{first_date}')
    print(f'STRT_WK_DT:{strt_wk_dt}')
    print(f'NEXT_DS_NODASH:{next_ds_nodash}')
    print(f'S#_KEY:{s3_key}')

with dag:
    execution_date = '{{ execution_date }}'
    next_ds_nodash = NEXT_DS_NODASH
    strt_wk_dt = strt_of_wk_strt_mon_dt()

    t1 = PythonOperator(
        task_id='show_template',
        python_callable=test,
        op_kwargs={'execution_date': execution_date,
                   'next_ds_nodash': next_ds_nodash,
                   'strt_wk_dt': strt_wk_dt,
                   's3_key':f'snowflakes/FEEDS/{strt_wk_dt}/abc_{strt_wk_dt}.csv'},
        provide_context=True)

С пакетом datetime Сначала я попытался использовать библиотеку DateTime, и она отлично работала, как показано на скриншоте ниже

Ниже cstm_date.py

from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)

NEXT_DS_NODASH = '{{ (execution_date + macros.timedelta(days=1)).strftime("%m%d%Y") }}'

def strt_of_wk_strt_mon_dt():
    return (datetime.today().date() - timedelta(days=datetime.today().weekday())).strftime('%Y_%m_%d')

Результат печатается, как показано ниже enter image description here

Next, I tried with using pendulum library, the output is not printing the date value ** Pendulum package**

Below is the cstm_date.py


import pendulum
import logging
logger = logging.getLogger(__name__)

NEXT_DS_NODASH = '{{ (execution_date + macros.timedelta(days=1)).strftime("%m%d%Y") }}'


def strt_of_wk_strt_mon_dt():
    today = pendulum.now()
    return today.start_of('week').format('YYYY_MM_DD')

На выходе не печатается значение STRT_WK_DT

введите описание изображения здесь

Что мне не хватает?

1 Ответ

1 голос
/ 10 июля 2020

Вы используете правильную инструкцию преобразования для datetime.strftime, но не используете строку datetime инструкцию преобразования в своей новой функции, которая использует pendulum.

Yours

def strt_of_wk_strt_mon_dt():
    today = pendulum.now()
    return today.start_of('week').format('YYYY_MM_DD')

Предназначено

def strt_of_wk_strt_mon_dt():
    today = pendulum.now()
    return today.start_of('week').format('%Y_%m_%d')
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...