Воздушный поток не получает start_date от DAG - PullRequest
0 голосов
/ 19 июня 2020

Я регистрирую новый DAG следующим образом:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook
from datetime import datetime, timedelta, timezone
import pendulum

local_tz = pendulum.timezone("UTC")

default_args = {
    'owner': 'me',
    'depends_on_past': False,
    'start_date': datetime(2020, 6, 19, 9, 37, 35, tzinfo=local_tz),
    'email': ["blah@blah.com"],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=15)
}
dag = DAG(
    dag_id="some_id",
    default_args=default_args,
    description= "Some description",
    schedule_interval="@once"
)

def to_be_executed_py():
    print("I did it, ma!")

with dag:
    t1 = PythonOperator(
        task_id="some_id",
        python_callable=to_be_executed_py)

Я хочу, чтобы это выполнялось один раз и только один раз во время, указанное в start_date. После загрузки DAG (с использованием S3) я не вижу в деталях "start_date". Вместо этого я вижу в деталях (под default_args):

{'owner': 'me', 
'depends_on_past': False, 
'start_date': datetime.datetime(2020, 6, 19, 9, 37, 35, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 
'email': ['bleh@blah.com'], 
'email_on_failure': False, 
'email_on_retry': False, 
'retries': 1, 
'retry_delay': datetime.timedelta(0, 900)}

Я здесь что-то делаю не так? Правильно ли я делаю предположение, что это должно выполняться при заданном start_time? Я просмотрел похожие варианты использования, но не многие из них устанавливают start_date для включения времени.

ОБНОВЛЕНИЕ

В настоящее время группа DAG запускается сразу после возобновления работы. Определенно не набирая время начала. Все ресурсы, которые я нашел в Интернете, не имеют здесь подходящего ответа.

Ответы [ 2 ]

1 голос
/ 21 июня 2020

Разобрался в проблеме. Это было двояко. Один из них, переводчик, который мы использовали, работал на 12 часов. Поскольку это было вечером, он устанавливал это в прошлое (заставляя Airflow играть в догонялки).

Во-вторых, нам не нужен часовой пояс. К тому же мы не ставили dag в задаче. Таким образом, код должен выглядеть следующим образом:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook
from datetime import datetime, timedelta, timezone

default_args = {
    'owner': 'me',
    'depends_on_past': False,
    'start_date': datetime(2020, 6, 19, 21, 37, 35),
    'email': ["blah@blah.com"],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=15)
}
dag = DAG(
    dag_id="some_id",
    default_args=default_args,
    description= "Some description",
    schedule_interval="@once"
)

def to_be_executed_py(ds, **kwargs):
    print("I did it, ma!")

with dag:
    t1 = PythonOperator(
        dag=dag,
        provide_context=True,
        task_id="some_id",
        python_callable=to_be_executed_py)

С этими изменениями все выполняется в заданное время, один раз и только один раз.

0 голосов
/ 20 июня 2020

С помощью комбинации schedule_interval='@daily', ShortCircuitOperator и переменных воздушного потока вы можете найти обходной путь; DAG запускается каждый день и проверяет, находится ли сегодняшний день в списке дней, которые вы ввели в качестве переменной воздушного потока. Если это так, продолжайте и запускайте последующие задачи, а если нет, он пропускает последующие задачи и ждет следующего выполнения завтра. вот определение DAG:

import airflow.utils.helpers
from airflow.models import DAG, Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='run_on_release_day',
    default_args=args,
    schedule_interval='@daily'
)

def check_release_date(**context):
    release_dates = Variable.get('release_dates')
    print(context, release_dates)
    return context['ds'] in release_dates

cond = ShortCircuitOperator(
    task_id='condition',
    python_callable=check_release_date,
    dag=dag,
    provide_context=True,
)

tasks = [DummyOperator(task_id='task_' + str(i), dag=dag) for i in [1, 2]]

airflow.utils.helpers.chain(cond, *tasks)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...