Оператор воздушного потока HdfsSensor не работает - PullRequest
0 голосов
/ 24 марта 2020

Я пытаюсь использовать сенсорный оператор Hdfs в Airflow для запуска следующих задач в зависимости от прибытия файла по указанному пути. Но, когда я развертываю dag, получаю ошибку как

Broken DAG: [/usr/local/airflow/dags/test_sensor_dag.py] неверный синтаксис (client.py, строка 1473) Код:

from airflow import DAG
from airflow.sensors.hdfs_sensor import HdfsSensor
from datetime import datetime


default_args = {'owner': 'airflow',
                'depends_on_past': False,
                'provide_context': True,
                'start_date': datetime(2020, 3, 19, 0, 0),
                'email': ['hr@***.com'],
                'email_on_failure': True,
                'email_on_retry': False,
                'retries': 0,
                'concurrency': 1
                }

# run it daily at 6AM
schedule_interval = '00 6 * * *'

dag_name = 'test_sensor_dag'

dag = DAG(
    dag_id=dag_name,
    default_args=default_args,
    schedule_interval=schedule_interval)

source_data_sensor = HdfsSensor(
    task_id='source_data_sensor',
    filepath='/data/test/file.csv',
    poke_interval=10,
    timeout=5,
    dag=dag
).poke()

success_notification = EmailOperator(to=['hr@***.com'], task_id='success_notification',
                                     subject='[Success:] test for {{ ds }}',
                                     html_content='Successfully ran the DAG',
                                     dag=dag)

source_data_sensor
success_notification.set_upstream(source_data_sensor)

1 Ответ

0 голосов
/ 05 апреля 2020

Вам не нужно вызывать .poke() в вашем коде, airflow выполняет функцию poke во время выполнения

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...