Воздушный поток - пропустить DAG, если датчик неисправен - PullRequest
0 голосов
/ 25 мая 2020

У меня есть процесс, в котором я жду файл каждую неделю, но этот файл имеет отметку времени в его имени для даты измерения. Итак, я знаю, что у меня что-то будет на этой неделе, и имя может быть от 2020-05-25 * .csv до 2020-05-31 * .csv.

Единственный способ, которым я узнаю, чтобы начать мои процессы с воздушным потоком - запустить датчик при запуске @daily и использовать дату выполнения, чтобы найти, есть ли файл. Дело в том, что, поскольку я не знаю, в какой день будет загружен файл, у меня будет 6 неисправных датчиков, поэтому 6 неудачных DAG-файлов и 1 успешный.

Пример части датчиков SFTP:

with DAG(
        "geometrie-sftp-to-safe",
        default_args=default_args,
        schedule_interval="@daily",
        catchup=True,
) as dag:
    starting_sensor = DummyOperator(
        task_id="starting_sensor"
    )

    sensor_sftp_A = SFTPSensor(
        task_id="sensor_sftp_A",
        path="/input/geometrie/prod/Track_Geometry-{{ ds_nodash }}_A.csv",
        sftp_conn_id="ssh_ftp_landing",
        poke_interval=60,
        soft_fail=True,
        mode="reschedule"
    )

Секунда с GCSSensor

with DAG(
        "geometrie-preprocessing",
        default_args=default_args,
        schedule_interval="@daily",
        catchup=True
) as dag:
    # File A
    sensor_gcs_A = GoogleCloudStorageObjectSensor(
        task_id="gcs-sensor_A",
        bucket="lisea-mesea-sea-cloud-safe",
        object="geometrie/original/track_geometry_{{ ds_nodash }}_A.csv",
        google_cloud_conn_id="gcp_conn",
        poke_interval=50
    )

Вот почему я хотел бы, чтобы DAG-файлы были пропущены, если и только если датчик вышел из строя. Если это что-то еще, я хотел бы настоящего провала.

1 Ответ

1 голос
/ 26 мая 2020

Airflow имеет несколько датчиков, которые определяют каталог для проверки наличия определенного файла. Параметр schedule_interval как None будет работать для вашего варианта использования, поскольку вы хотите, чтобы DAG запускался только при получении файла (учитывая, что файл может быть получен в любое время в течение недели).

В приведенном ниже примере для GCSSensor будет ощущаться ведро для конкретного типа файла и напечатает имя файла. Я почти уверен, что датчик SFTP должен работать таким же образом.

dag = DAG(
 dag_id='sensing-bucket',
 schedule_interval=None,
 default_args=args)

def new_file_detection(**context):
    value = context['ti'].xcom_pull(task_ids='list_Files')
    print('value is : '+str(value))

File_sensor = GoogleCloudStoragePrefixSensor(
                task_id='gcs_polling',
                bucket='lisea-mesea-sea-cloud-safe',
                prefix='geometrie/original/track_geometry_',
                dag=dag
            )

GCS_File_list = GoogleCloudStorageListOperator(
                    task_id='list_Files',
                    bucket='lisea-mesea-sea-cloud-safe',
                    prefix='geometrie/original/track_geometry_',
                    delimiter='.csv',
                    google_cloud_storage_conn_id='google_cloud_default',
                    dag=dag
                )

File_detection = PythonOperator(
                task_id='print_detected_filename',
                provide_context=True,
                python_callable=new_file_detection,
                dag=dag
            )

File_sensor >> GCS_File_list >> File_detection
...