Воздушный поток от CSV до BigQuery с изменениями схемы - PullRequest
0 голосов
/ 19 марта 2020

Фон

Мне нужно спроектировать конвейер Airflow для загрузки CSV в BigQuery.

Я знаю, что у CSV часто меняются схемы. После загрузки первого файла схема может иметь вид

id | ps_1 | ps_1_value

, когда второй файл приземляется, и я загружаю его, он может выглядеть как

id | ps_1 | ps_1_value | ps_1 | ps_2_value.

Вопрос

Как лучше всего с этим справиться?


Моя первая мысль при подходе к этому будет

  1. Загрузить второй файл
  2. Сравнить схему с текущей таблицей
  3. Обновить таблицу, добавив два столбца (ps_2, ps_2_value)
  4. Вставить новые строки

Я бы сделать это в PythonOperator.

Если файл 3 появляется и выглядит как id | ps_2 | ps_2_value Я бы заполнил пропущенные столбцы и вставил.

Спасибо за отзыв.

Ответы [ 2 ]

0 голосов
/ 19 марта 2020

После загрузки двух предыдущих файлов example_data_1.csv и example_data_2.csv я вижу, что поля вставляются в правильные столбцы, а новые столбцы добавляются по мере необходимости.

Редактировать: момент лампочки понял, что schema_update_options существует. Смотрите здесь: https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.SchemaUpdateOption.html

csv_to_bigquery = GoogleCloudStorageToBigQueryOperator(
    task_id='csv_to_bigquery',
    google_cloud_storage_conn_id='google_cloud_default',
    bucket=airflow_bucket,
    source_objects=['data/example_data_3.csv'],
    skip_leading_rows=1,
    bigquery_conn_id='google_cloud_default',    
    destination_project_dataset_table='{}.{}.{}'.format(project, schema, table),
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    schema_update_options=['ALLOW_FIELD_RELAXATION', 'ALLOW_FIELD_ADDITION'],
    autodetect=True,
    dag=dag
)

enter image description here

0 голосов
/ 19 марта 2020

По сути, рекомендуемый конвейер для вашего случая состоит в создании временной таблицы для обработки ваших новых данных. * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *

  1. Загрузка нового файла во временную таблицу
  2. Сравнение схемы фактической таблицы и схемы временной таблицы.
  3. Выполнение запроса для перемещения данных из временной таблицы в фактическую таблицу , Если во временной таблице появились новые поля, добавьте их в фактическую таблицу с помощью параметра schema_update_options. Кроме того, если ваша фактическая таблица имеет поля в режиме NULLABLE, она сможет легко справиться с отсутствующими столбцами, если в ваших новых данных есть пропущенное поле.
  4. Удалить временную таблицу
  5. Если вы Вы используете GCS, переместите ваш файл в другое ведро или каталог.

Наконец, я хотел бы указать несколько ссылок, которые могут быть вам полезны:

  1. Документация AirFlow (операторы BigQuery)
  2. Статья , в которой описана проблема, аналогичная вашей, и где вы можете найти некоторые из упомянутых сведений.

Надеюсь, это поможет

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...