Я написал задачу на Python для Airflow. Когда я загружаю DAG в Airflow, он показывает все отлично. Как только я запускаю прогон для своей группы DAG, он создает прогон и переключается на успешное выполнение, даже не выполняя мою задачу. Веб-сервер и планировщик работают, и в журнале ничего нет. Задача даже не получает статус во время выполнения (даже не пропущена).
Если я запускаю свою задачу напрямую, используя airflow test update_dags work 2019-01-01
, она выполняется просто отлично.
Это мой DAG:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime.today(),
'email': ['***redacted***'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'params': {
'git_repository': '***redacted***',
'git_ref': 'origin/master',
'git_folder': '/opt/dag-repository'
}
}
def command(cmd: str, *args):
templated_command = cmd.format(*args)
print('Running command: {}'.format(templated_command))
os.system(templated_command)
def do_work(**kwargs):
params = kwargs['params']
dag_directory = kwargs['conf'].get('core', 'dags_folder')
git_repository = params['git_repository']
git_ref = params['git_ref']
git_folder = params['git_folder']
command('if [ ! -d {0} ]; then git clone {1} {0}; fi', git_folder, git_repository)
command('cd {0}; git fetch -apt', git_folder)
command('cd {0}; git reset --hard {1}', git_folder, git_ref)
command('ln -sf {0} {1}', '{}/src'.format(git_folder), dag_directory)
with DAG('update_dags', default_args=default_args, schedule_interval=timedelta(minutes=10), max_active_runs=1) as dag:
work_stage = PythonOperator(
task_id="work",
python_callable=do_work,
provide_context=True
)