AIRFLOW DAG для загрузки файлов не удается, когда файлов нет - PullRequest
0 голосов
/ 09 октября 2019

У меня есть DAG с воздушным потоком, который отлично работает, когда файлы присутствуют, но error-> не удается, когда исходных файлов нет.

Случайным образом я получаю файлы из определенного источника, которые моя DAG забираети процессы. Хотя мне нужно ежедневно запускать DAG, файлы не обязательно должны быть там ежедневно. Это может быть понедельник, среда или даже воскресенье вечером.

Меня не волнуют дни без новых файлов. Я беспокоюсь о днях, когда приходят новые файлы и они ломаются.

Как мне сообщить DAG, что если файла не существует, то он корректно завершается успешно? Моя DAG ниже (пожалуйста, игнорируйте настройку расписания. Я все еще в режиме разработки):

import airflow
from airflow import models
from airflow.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['email@gmail.com'],
    'email_on_failure': True,
    'schedule_interval': 'None',
}

dag = models.DAG(
    dag_id='Source1_Ingestion',
    default_args=args
)

# [START load ATTOM File to STAGING]
load_File_to_Source1_RAW = GoogleCloudStorageToBigQueryOperator(
    task_id='Source1_GCS_to_GBQ_Raw',
    bucket='Source1_files',
    source_objects=['To_Process/*.txt'],
    destination_project_dataset_table='Source1.Source1_RAW',
    schema_fields=[
        {'name': 'datarow', 'type': 'STRING', 'mode': 'NULLABLE'},
    ],
    field_delimiter='§',
    write_disposition='WRITE_TRUNCATE',
    google_cloud_storage_conn_id='GCP_EDW_Staging',
    bigquery_conn_id='GCP_EDW_Staging',
    dag=dag)
# [END howto_operator_gcs_to_bq]

# [START move files to Archive]
archive_attom_files = GoogleCloudStorageToGoogleCloudStorageOperator(
    task_id='Archive_Source1_Files',
    source_bucket='Source1_files',
    source_object='To_Process/*.txt',
    destination_bucket='Source1_files',
    destination_object='Archive/',
    move_object=True,
    google_cloud_storage_conn_id='GCP_EDW_Staging',
    dag=dag
)

# [END move files to archive]

load_File_to_Source1_RAW.set_downstream(archive_Source1_files)

1 Ответ

0 голосов
/ 13 октября 2019

Одним из способов решения этой проблемы является добавление оператора датчика в рабочий процесс.

Nehil Jain хорошо описывает датчики :

Датчики - это особый тип оператора воздушного потока, который будет работать до тех пор, пока не будет соблюден определенный критерий. Например, вы знаете, что файл поступит в вашу корзину S3 в течение определенного периода времени, но точное время, когда файл прибывает, является несовместимым.

Для вашего случая использования, похоже, есть Google Cloud Sensor , который "проверяет наличие файла в Google Cloud Storage". Причина, по которой вы бы включили датчик, заключается в том, что вы отделяете операцию «определить, существует ли файл» от операции «получить файл (и что-то с ним сделать)».

По умолчанию датчики имеютдва метода ( source ):

  • poke : код, выполняемый poke_interval раз, который проверяет, выполняется ли условие
  • execute : используйте метод poke для проверки условия по расписанию, определенному poke_interval;происходит сбой при достижении аргумента timeout

В обычном датчике обнаружения файлов оператор получает инструкции для проверки источника для файла по расписанию (например, проверяет каждые 5 минут до3 часа, чтобы увидеть, если файл существует). Если датчик успешно выполняет свои условия тестирования, он успешно и позволяет DAG продолжать вниз по течению к следующему оператору (операторам). Если ему не удается найти файл, время ожидания истекает, и оператор датчика помечается как неисправный.

С помощью только оператора датчика вам уже удалось разделить случаи ошибок - сбой DAG на GoogleCloudStorageObjectSensorвместо GoogleCloudStorageToBigQueryOperator, когда файл не существует, и сбой в GoogleCloudStorageToBigQueryOperator, когда что-то не так с логикой передачи. Важно, что для вашего случая использования Airflow поддерживает аргумент soft_fail, который «помечает [s] задачу как Пропущенную при сбое»

В этой следующей части я буду предупреждать эту следующую часть, явно указав, что яЯ не очень знаком с операторами GoogleCloudStorage. Если оператор не разрешает использование подстановочных знаков в датчике, вам может потребоваться изменить метод датчика poke, чтобы обеспечить более сложное обнаружение файлов на основе шаблонов. Именно здесь архитектура плагинов Airflow действительно может сиять, позволяя вам модифицировать и расширять существующих операторов в соответствии с вашими потребностями.

Пример, который я приведу здесь, состоит в том, что SFTPSensor поддерживает только поиск определенного файла из коробки. Мне нужно было использовать подстановочные знаки, поэтому я написал плагин, который модифицирует SFTPSensor для поддержки регулярных выражений в идентификации файлов. В моем случае это было просто изменение poke, чтобы переключиться с опроса на наличие одного файла на опрос списка файлов, а затем передать его через регулярное выражение для фильтрации списка.

Вбеглый взгляд, похоже, что GoogleCloudStorageSensor тыкает объект с помощью метода hook.exists. Я не могу говорить о том, будет ли подстановочный знак работать там, но если это не так, похоже, что есть метод hook.list, который позволит вам реализовать рабочий процесс, аналогичный тому, что я сделал дляSFTPRegexSensor.

Я включил некоторые исходные коды для метода poke плагина SFTPRegexSensor, модифицированного так, как я думаю, будет работать с GCS в случае, если это будет полезно:

def poke(self, context): 
    # create a hook (removed some of the SSH/SFTP intricacies for simplicity)
    # get list of file(s) matching regex 
    files = hook.list(self.bucket, self.prefix) # you need to define operator paramters for the choices that are dynamic in the operator's poke (e.g. which bucket, what the file prefix is); swapped in the GCS args 
    regex = re.compile(self.remote_filename) 
    files = list(filter(regex.search, files)) 

    if not files: 
        return False 
    return True
...