Я регистрирую новый DAG следующим образом:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook
from datetime import datetime, timedelta, timezone
import pendulum
local_tz = pendulum.timezone("UTC")
default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': datetime(2020, 6, 19, 9, 37, 35, tzinfo=local_tz),
'email': ["blah@blah.com"],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=15)
}
dag = DAG(
dag_id="some_id",
default_args=default_args,
description= "Some description",
schedule_interval="@once"
)
def to_be_executed_py():
print("I did it, ma!")
with dag:
t1 = PythonOperator(
task_id="some_id",
python_callable=to_be_executed_py)
Я хочу, чтобы это выполнялось один раз и только один раз во время, указанное в start_date
. После загрузки DAG (с использованием S3) я не вижу в деталях "start_date". Вместо этого я вижу в деталях (под default_args
):
{'owner': 'me',
'depends_on_past': False,
'start_date': datetime.datetime(2020, 6, 19, 9, 37, 35, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
'email': ['bleh@blah.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(0, 900)}
Я здесь что-то делаю не так? Правильно ли я делаю предположение, что это должно выполняться при заданном start_time
? Я просмотрел похожие варианты использования, но не многие из них устанавливают start_date
для включения времени.
ОБНОВЛЕНИЕ
В настоящее время группа DAG запускается сразу после возобновления работы. Определенно не набирая время начала. Все ресурсы, которые я нашел в Интернете, не имеют здесь подходящего ответа.