У меня есть некоторые задачи, которые нужно запускать, когда один из немногих определенных файлов или каталогов изменяется на S3.
Допустим, у меня есть PythonOperator, и он должен запускаться, если /path/file.csv изменяется или если /path/nested_path/some_other_file.csv изменится.
Я попытался создать динамические c KeySensors, как это:
trigger_path_list = ['/path/file.csv', '//path/nested_path/some_other_file.csv']
for trigger_path in trigger_path_list:
file_sensor_task = S3KeySensor(
task_id=get_sensor_task_name(trigger_path),
poke_interval=30,
timeout=60 * 60 * 24 * 8,
bucket_key=os.path.join('s3://', s3_bucket_name, trigger_path),
wildcard_match=True)
file_sensor_task >> main_task
Однако, это будет означать, что оба S3KeySensors должны быть запущен для того, чтобы он был обработан. Я также попытался сделать обе задачи уникальными, как здесь:
for trigger_path in trigger_path_list:
main_task = PythonOperator(
task_id='{}_task_triggered_by_{}'.format(dag_name, trigger_path),
...)
file_sensor_task = S3KeySensor(
task_id=get_sensor_task_name(trigger_path),
poke_interval=30,
timeout=60 * 60 * 24 * 8,
bucket_key=os.path.join('s3://', s3_bucket_name, trigger_path),
wildcard_match=True)
file_sensor_task >> main_task
Однако это будет означать, что DAG не завершит работу sh, если все файлы из списка не появятся. Поэтому, если /path/file.csv появляется 2 раза подряд, он не будет запущен во второй раз, так как эта часть DAG будет завершена.
Нет способа передать несколько файлов к S3KeySensor? Я не хочу создавать один DAG для каждого пути, так как для меня это будет 40 DAGS на 5 путей, что дает около 200 DAG.
Есть идеи?