Доступ к переменным Airflow по умолчанию вне оператора - PullRequest
0 голосов
/ 08 мая 2020

Я пытаюсь создать динамический c список задач, чтобы проверить предыдущие запуски пакета за день, завершился он или нет. Для этого у меня есть время (ЧЧММ), хранящееся в переменной Airflow, и я использовал переменную datetime.now () для получения текущего ЧЧММ и создания списка предыдущих запусков. Но поскольку метка Airflow проверяется каждый раз, она выбирает последнюю дату и время и на ее основе генерирует новый предыдущий список задач.

Я пытался увидеть вместо сравнения datetime.now (), используя переменные воздушного потока по умолчанию {{ds}} и {{ts}}, чтобы избежать вышеуказанной проблемы. Но он обрабатывает эти переменные как String или не может распознать их как переменные и выбрасывает переменную ts / ds, которая не определена.

есть ли способ / обходной путь для доступа к этим переменным вне операторов, как указано выше logi c для создания списка динамических c задач, которые будут запускаться на основе проверки завершения предыдущего пакетного запуска.

Заранее спасибо.

from datetime import datetime,timedelta,date
from pytz import timezone, utc
import pendulum

## Below would come from Airflow variable.
dag_times = ["0700", "0715", "0730" ,"0730", "0930","1130","1330","1630","2000"]

## This is the code to get the current time.. this is keep changing as the airflow validates the DAG.
current_dag_time = datetime.now().astimezone(timezone('US/Pacific')).strftime('%H%M')

schedule_run_time = min(dag_times, key=lambda x:abs(int(x)-int(current_dag_time)))
current_run = dag_times.index(schedule_run_time)
print("current_run",current_run)
intra_day_time = dag_times[dag_times.index(schedule_run_time)-1] if current_run > 0 else schedule_run_time
previous_runs = []
if current_run > 0:
    # print(dag_times.index(schedule_run_time))
    previous_runs = dag_times[0:dag_times.index(schedule_run_time)] 
else:
    previous_runs.append(dag_times[-1])

previous_run_tasks=[]

for dag_name in previous_runs:
    item = {}
    if int(dag_name) == 0:
        if date.today().weekday() == 0 :
            start_time =-52
            end_time = 4
        else:
            start_time =-24
            end_time = 24
        # poke_task_name = "SAMPLE_BOX_%s" % dag_name
        item = {"poke_task_name": "SAMPLE_BOX_%s" % dag_name, "start_time":start_time, "end_time":end_time}
    elif int(dag_name) > 0 :
        start_time =0
        end_time = 24
        poke_task_name = "SAMPLE_BOX_%s" % dag_name
        item = {"poke_task_name": "SAMPLE_BOX_%s" % dag_name, "start_time":start_time, "end_time":end_time}
    else:
        print("error")
    previous_run_tasks.append(item)
    print(previous_run_tasks)

if int(schedule_run_time) == 0:
    if date.today().weekday() == 0 :
        start_time =-52
        end_time = 4
    else:
        start_time =-24
        end_time = 24
    poke_task_name = "SAMPLE_BOX_%s" % dag_times[-1]
    generate_task_name = "SAMPLE_BOX_%s" % schedule_run_time
elif int(schedule_run_time) > 0 :
    start_time =0
    end_time = 24
    poke_task_name = "SAMPLE_BOX_%s" % intra_day_time
    generate_task_name = "SAMPLE_BOX_%s" % schedule_run_time
else:
    print("error")

print("start_time::::",start_time)
print("end_time::::",end_time)
print("generate_task_name::::",generate_task_name)
print("poke_task_name::::",poke_task_name)

1 Ответ

0 голосов
/ 11 мая 2020

Эти переменные Airflow по умолчанию создаются только в контексте экземпляра задачи для данного запуска DAG, и поэтому они доступны только в шаблонных полях каждого оператора. Попытка использовать их вне этого контекста не сработает.

Я подготовил простой DAG с задачей, которая отображает дату выполнения (ds) в качестве параметра:

from airflow import macros
from airflow import models
from airflow.operators import bash_operator
import datetime

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_args = {
            "start_date": yesterday,
            "retries": 1,
            "email_on_failure": False,
            "email_on_retry": False,
            "email": "youremail@host.com"
}

with models.DAG(
    'printing_the_execution_date_ts', 
    schedule_interval=datetime.timedelta(days=1), 
    default_args=default_args) as dag:

    printing_the_execution_date = bash_operator.BashOperator(
            task_id="display",
            bash_command="echo {{ ds }}"
    )

    printing_the_execution_date

The {{ }} скобки сообщают Airflow, что это шаблон Jinja.

Вы также можете использовать переменную ts, которая является датой выполнения в формате ISO 8601. Таким образом, в прогоне dag с отметкой 2020-05-10 это будет отображаться в:

'echo {{ ds }}'
echo 2020-05-10

'echo {{ ts }}'
echo 2020-05-10T00:00:00+00:00

Я рекомендую вам поискать этот поток Stackoverflow , где вы можете найти пример с используя PythonOperator.

...