Как выровнять дату выполнения в настройке без захвата в потоке воздуха? - PullRequest
0 голосов
/ 12 сентября 2018

В настройках захвата я заметил, что дата выполнения, отправленная исполнителю, правильно выровнена, но когда я отключаю настройку захвата следующим образом:

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': airflow.utils.dates.days_ago(5),
    'schedule_interval': '@daily'
}

dag = DAG('xiang-01', catchup=False, default_args=default_args)

task = BashOperator(
    task_id='task',
    bash_command='echo "{{ task_instance_key_str }} {{ ts }}" && sleep 10',
    dag=dag)

Дата выполнения не выровнена, например, обработанная задача:

echo "xiang-01__task__20180909 2018-09-09T22:33:17.961926+00:00" && sleep 10

Согласно документу, оно должно быть выровнено: https://airflow.apache.org/scheduler.html#backfill-and-catchup

Так чего мне не хватает?

Обновление:

Если быть более точным, поскольку моя начальная дата устанавливается с помощью days_ago(5), которая установлена ​​в полночь 5 дней назад, начиная с 00:00:00. Я ожидал, что дата выполнения также должна быть выровнена к полуночи, что-то вроде 2018-09-09T00:00:00, но я получил время 2018-09-09T22:33:17.961926+00:00, которое выглядит как выровненное к тому времени, когда я приостановил этот DAG.

1 Ответ

0 голосов
/ 12 сентября 2018

Я разобрался, работает следующая модификация:

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'start_date': airflow.utils.dates.days_ago(5),
    #'schedule_interval': '@daily'
}

dag = DAG('xiang-02', catchup=False, default_args=default_args, schedule_interval='@daily')

task = BashOperator(
    task_id='task',
    bash_command='echo "{{ task_instance_key_str }} {{ ts }}" && sleep 10',
    dag=dag)

Подвох, похоже, schedule_interval теперь является аргументом DAG, если я использую его в конструкции DAG, он теперь работает правильно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...