У меня есть процесс, в котором я жду файл каждую неделю, но этот файл имеет отметку времени в его имени для даты измерения. Итак, я знаю, что у меня что-то будет на этой неделе, и имя может быть от 2020-05-25 * .csv до 2020-05-31 * .csv.
Единственный способ, которым я узнаю, чтобы начать мои процессы с воздушным потоком - запустить датчик при запуске @daily и использовать дату выполнения, чтобы найти, есть ли файл. Дело в том, что, поскольку я не знаю, в какой день будет загружен файл, у меня будет 6 неисправных датчиков, поэтому 6 неудачных DAG-файлов и 1 успешный.
Пример части датчиков SFTP:
with DAG(
"geometrie-sftp-to-safe",
default_args=default_args,
schedule_interval="@daily",
catchup=True,
) as dag:
starting_sensor = DummyOperator(
task_id="starting_sensor"
)
sensor_sftp_A = SFTPSensor(
task_id="sensor_sftp_A",
path="/input/geometrie/prod/Track_Geometry-{{ ds_nodash }}_A.csv",
sftp_conn_id="ssh_ftp_landing",
poke_interval=60,
soft_fail=True,
mode="reschedule"
)
Секунда с GCSSensor
with DAG(
"geometrie-preprocessing",
default_args=default_args,
schedule_interval="@daily",
catchup=True
) as dag:
# File A
sensor_gcs_A = GoogleCloudStorageObjectSensor(
task_id="gcs-sensor_A",
bucket="lisea-mesea-sea-cloud-safe",
object="geometrie/original/track_geometry_{{ ds_nodash }}_A.csv",
google_cloud_conn_id="gcp_conn",
poke_interval=50
)
Вот почему я хотел бы, чтобы DAG-файлы были пропущены, если и только если датчик вышел из строя. Если это что-то еще, я хотел бы настоящего провала.