Я пытаюсь использовать сенсорный оператор Hdfs в Airflow для запуска следующих задач в зависимости от прибытия файла по указанному пути. Но, когда я развертываю dag, получаю ошибку как
Broken DAG: [/usr/local/airflow/dags/test_sensor_dag.py] неверный синтаксис (client.py, строка 1473) Код:
from airflow import DAG
from airflow.sensors.hdfs_sensor import HdfsSensor
from datetime import datetime
default_args = {'owner': 'airflow',
'depends_on_past': False,
'provide_context': True,
'start_date': datetime(2020, 3, 19, 0, 0),
'email': ['hr@***.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'concurrency': 1
}
# run it daily at 6AM
schedule_interval = '00 6 * * *'
dag_name = 'test_sensor_dag'
dag = DAG(
dag_id=dag_name,
default_args=default_args,
schedule_interval=schedule_interval)
source_data_sensor = HdfsSensor(
task_id='source_data_sensor',
filepath='/data/test/file.csv',
poke_interval=10,
timeout=5,
dag=dag
).poke()
success_notification = EmailOperator(to=['hr@***.com'], task_id='success_notification',
subject='[Success:] test for {{ ds }}',
html_content='Successfully ran the DAG',
dag=dag)
source_data_sensor
success_notification.set_upstream(source_data_sensor)