Одним из способов решения этой проблемы является добавление оператора датчика в рабочий процесс.
Nehil Jain хорошо описывает датчики :
Датчики - это особый тип оператора воздушного потока, который будет работать до тех пор, пока не будет соблюден определенный критерий. Например, вы знаете, что файл поступит в вашу корзину S3 в течение определенного периода времени, но точное время, когда файл прибывает, является несовместимым.
Для вашего случая использования, похоже, есть Google Cloud Sensor , который "проверяет наличие файла в Google Cloud Storage". Причина, по которой вы бы включили датчик, заключается в том, что вы отделяете операцию «определить, существует ли файл» от операции «получить файл (и что-то с ним сделать)».
По умолчанию датчики имеютдва метода ( source ):
- poke : код, выполняемый
poke_interval
раз, который проверяет, выполняется ли условие - execute : используйте метод
poke
для проверки условия по расписанию, определенному poke_interval
;происходит сбой при достижении аргумента timeout
В обычном датчике обнаружения файлов оператор получает инструкции для проверки источника для файла по расписанию (например, проверяет каждые 5 минут до3 часа, чтобы увидеть, если файл существует). Если датчик успешно выполняет свои условия тестирования, он успешно и позволяет DAG продолжать вниз по течению к следующему оператору (операторам). Если ему не удается найти файл, время ожидания истекает, и оператор датчика помечается как неисправный.
С помощью только оператора датчика вам уже удалось разделить случаи ошибок - сбой DAG на GoogleCloudStorageObjectSensor
вместо GoogleCloudStorageToBigQueryOperator
, когда файл не существует, и сбой в GoogleCloudStorageToBigQueryOperator
, когда что-то не так с логикой передачи. Важно, что для вашего случая использования Airflow поддерживает аргумент soft_fail
, который «помечает [s] задачу как Пропущенную при сбое»
В этой следующей части я буду предупреждать эту следующую часть, явно указав, что яЯ не очень знаком с операторами GoogleCloudStorage
. Если оператор не разрешает использование подстановочных знаков в датчике, вам может потребоваться изменить метод датчика poke
, чтобы обеспечить более сложное обнаружение файлов на основе шаблонов. Именно здесь архитектура плагинов Airflow действительно может сиять, позволяя вам модифицировать и расширять существующих операторов в соответствии с вашими потребностями.
Пример, который я приведу здесь, состоит в том, что SFTPSensor
поддерживает только поиск определенного файла из коробки. Мне нужно было использовать подстановочные знаки, поэтому я написал плагин, который модифицирует SFTPSensor
для поддержки регулярных выражений в идентификации файлов. В моем случае это было просто изменение poke
, чтобы переключиться с опроса на наличие одного файла на опрос списка файлов, а затем передать его через регулярное выражение для фильтрации списка.
Вбеглый взгляд, похоже, что GoogleCloudStorageSensor
тыкает объект с помощью метода hook.exists
. Я не могу говорить о том, будет ли подстановочный знак работать там, но если это не так, похоже, что есть метод hook.list
, который позволит вам реализовать рабочий процесс, аналогичный тому, что я сделал дляSFTPRegexSensor
.
Я включил некоторые исходные коды для метода poke
плагина SFTPRegexSensor, модифицированного так, как я думаю, будет работать с GCS в случае, если это будет полезно:
def poke(self, context):
# create a hook (removed some of the SSH/SFTP intricacies for simplicity)
# get list of file(s) matching regex
files = hook.list(self.bucket, self.prefix) # you need to define operator paramters for the choices that are dynamic in the operator's poke (e.g. which bucket, what the file prefix is); swapped in the GCS args
regex = re.compile(self.remote_filename)
files = list(filter(regex.search, files))
if not files:
return False
return True