Bigquery: создайте таблицу, если она не существует, и загрузите данные, используя Python и Apache AirFlow - PullRequest
1 голос
/ 26 марта 2019

Сначала я получаю все данные, используя запрос MySQL, из производственной базы данных, затем сохраняю эти данные как NEW LINE DELIMITED JSON в облачном хранилище Google, что я хочу сделать:
1.проверьте, существует ли таблица
2.если таблица не существует, создайте таблицу, используя схему автоопределения
3.хранить данные

Все это будет запланировано в воздушном потоке.Что меня действительно смутило, так это число 2, как мне это сделать на Python?или Airflow может делать это автоматически?

Ответы [ 2 ]

1 голос
/ 26 марта 2019

Воздушный поток может сделать это автоматически. Параметр create_disposition создает таблицу при необходимости. А параметр autodetect делает именно то, что вам нужно. Это для Воздушный поток 1.10.2 .

GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq',
    bucket='test_bucket',
    source_objects=['folder1/*.csv', 'folder2/*.csv'],
    destination_project_dataset_table='dest_table',
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    bigquery_conn_id='bq-conn',
    google_cloud_storage_conn_id='gcp-conn',
    autodetect=True, # This uses autodetect
    dag=dag
)
1 голос
/ 26 марта 2019

Из командной строки BigQuery, если ваш json-файл находится в GCS, тогда Загрузка данных JSON с автоматическим определением схемы делает 2 + 3 для вас в одной команде.

Глядя на документ AirFlow, GoogleCloudStorageToBigQueryOperator похоже делает то же самое. Я проверил источник , он просто вызывает BigQuery load api . Я верю, что это будет делать то, что вы хотите.

Если неясно, что означает каждый аргумент, вы можете выполнить поиск BigQuery Jobs api , используя имя аргумента.

Например, чтобы достичь 1 в вашем списке задач, вам нужно только указать:

write_disposition (string) - Расположение записи, если таблица уже существует.

Но чтобы узнать, какую строку нужно передать как write_disposition, вам нужно выполнить поиск в документе BigQuery. enter image description here

...