несколько путей к файлам в S3KeySensor на Airflow - PullRequest
0 голосов
/ 06 марта 2020

У меня есть некоторые задачи, которые нужно запускать, когда один из немногих определенных файлов или каталогов изменяется на S3.

Допустим, у меня есть PythonOperator, и он должен запускаться, если /path/file.csv изменяется или если /path/nested_path/some_other_file.csv изменится.

Я попытался создать динамические c KeySensors, как это:

    trigger_path_list = ['/path/file.csv', '//path/nested_path/some_other_file.csv']
    for trigger_path in trigger_path_list:
        file_sensor_task = S3KeySensor(
                    task_id=get_sensor_task_name(trigger_path),
                    poke_interval=30,
                    timeout=60 * 60 * 24 * 8,
                    bucket_key=os.path.join('s3://', s3_bucket_name, trigger_path),
                    wildcard_match=True)
                file_sensor_task >> main_task

Однако, это будет означать, что оба S3KeySensors должны быть запущен для того, чтобы он был обработан. Я также попытался сделать обе задачи уникальными, как здесь:

        for trigger_path in trigger_path_list:
            main_task = PythonOperator(
task_id='{}_task_triggered_by_{}'.format(dag_name, trigger_path), 
...)
            file_sensor_task = S3KeySensor(
                task_id=get_sensor_task_name(trigger_path),
                poke_interval=30,
                timeout=60 * 60 * 24 * 8,
                bucket_key=os.path.join('s3://', s3_bucket_name, trigger_path),
                wildcard_match=True)
            file_sensor_task >> main_task

Однако это будет означать, что DAG не завершит работу sh, если все файлы из списка не появятся. Поэтому, если /path/file.csv появляется 2 раза подряд, он не будет запущен во второй раз, так как эта часть DAG будет завершена.

Нет способа передать несколько файлов к S3KeySensor? Я не хочу создавать один DAG для каждого пути, так как для меня это будет 40 DAGS на 5 путей, что дает около 200 DAG.

Есть идеи?

1 Ответ

1 голос
/ 07 марта 2020

Пара идей для этого:

  1. Используйте другую задачу Airflow правила триггера , в частности, вы, вероятно, захотите one_success в основной задаче, что означает лишь одно из множества исходных данных. датчики должны успешно выполнять задачу. Это означает, что другие датчики будут продолжать работать, но вы можете потенциально использовать флаг soft_fail с низким poll_timeout, чтобы избежать любого сбоя. В качестве альтернативы, у вас может быть основная задача или отдельная задача после очистки, помечающая остальные датчики в DAG как успешные.
  2. В зависимости от того, сколько существует возможных путей, если их не слишком много, то, возможно, просто имеется один датчик задачи, который проходит по путям для проверки изменений. Как только один путь проходит проверку, вы можете вернуться, чтобы датчик прошел успешно. В противном случае продолжайте опрос, если путь не пройден.

В любом случае вам все равно придется планировать этот DAG часто / без остановки, если вы хотите продолжать слушать новые файлы. В общем, Airflow на самом деле не предназначен для длительных процессов. Если логи основной задачи c проще выполнить с помощью Airflow, вы все равно можете рассмотреть возможность изменения внешнего монитора процесса, но тогда вызовет DAG через API или CLI, содержащий основную задачу.

Также не уверен, применимо ли это здесь или что-то, что вы уже рассмотрели, но вас могут заинтересовать Уведомления о событиях S3 , чтобы более подробно узнать об измененных файлах или каталогах, которые затем могут быть использованы SQSSensor .

...