PythonOperator с набором python_callable выполняется постоянно - PullRequest
0 голосов
/ 05 ноября 2018
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from workflow.task import some_task

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['jimin.park1@aig.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
    'start_date': airflow.utils.dates.days_ago(0)
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('JiminTest', default_args=default_args, schedule_interval='*/1 * * * *', catchup=False)

t1 = PythonOperator(
    task_id='Task1',
    provide_context=True,
    python_callable=some_task,
    dag=dag
)

Сам фактический some_task просто добавляет метку времени к некоторому файлу. Как видно из конфигурационного файла dag, сама задача настроена на выполнение каждые 1 мин.

def some_task(ds, **kwargs):
    current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open("test.txt", "a") as myfile:
        myfile.write(current_time + '\n')

Я просто включил выходной файл и запустил веб-сервер без запуска планировщика. Эта функция вызывалась, и все добавлялось в файл при запуске веб-сервера. Когда я запускаю планировщик, в каждом цикле выполнения файл добавляется.

Я хочу, чтобы функция выполнялась каждую минуту, а не каждый цикл выполнения.

Ответы [ 2 ]

0 голосов
/ 06 ноября 2018

Попробуйте проверить параметр конфигурации scheduler_heartbeat_sec в вашем файле конфигурации. Для вашего случая это должно быть меньше 60 секунд.

Если вы не хотите, чтобы планировщик не выполнял предварительные проверки, установите для параметра catchup_by_default значение False (хотя я не уверен, относится ли это к вашему вопросу).

Укажите, какую версию Apache Airflow вы используете

0 голосов
/ 05 ноября 2018

Планировщик будет запускать каждый файл DAG каждый цикл планировщика, включая все операторы импорта.

Есть ли какой-нибудь исполняемый код в файле, откуда вы импортируете функцию?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...