Загрузка данных из группы данных в BigQuery с использованием Python - PullRequest
0 голосов
/ 26 июня 2018

Я использую вызовы API для извлечения данных из службы. Данные представляют собой вложенный Json с массивами, которые также могут содержать объекты Json.

Пример: enter image description here

По сути, я хочу загрузить его в таблицы в BigQuery. Я создал таблицу для каждого массива. Объекты Джейсона распакованы в одной таблице. Например:

Orders: All customer fields, all ShippingAdress, orderDateUtc etc..
Orders_items:  orderid, discountEach, giftTo etc..
Order_items_historicalCategories: ....

Я не уверен, как лучше это сделать. Я могу создавать CSV-файлы из вызова API (data steam), а затем использовать COPY для CSV для их загрузки, но это кажется чрезмерным. Я ищу способ пропустить создание CSV.

Есть ли оператор или пакет, который может обрабатывать эти данные и напрямую загружать их в таблицы? Я предполагаю, что то, что мне нужно сделать, уже было сделано многими другими организациями, но я не видел никакого встроенного метода, чтобы сделать это в документации. https://cloud.google.com/bigquery/docs/loading-data

Любая помощь будет оценена.

1 Ответ

0 голосов
/ 26 июня 2018

Вы в основном должны следовать документации о том, как загружать данные json, используя python , используя вложенные и повторяющиеся поля . Например, используя схему из последней ссылки, вы можете загрузить вложенные и повторные данные JSON следующим образом (вы можете протестировать с найденными образцами данных здесь ):

import sys

def load_nested_json():
    from google.cloud import bigquery
    client = bigquery.Client()

    dataset_id, table_id, uri = sys.argv[1:]
    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.schema = [
        bigquery.SchemaField('id', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('first_name', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('last_name', 'STRING', mode='NULLABLE'),
        bigquery.SchemaField('dob', 'DATE', mode='NULLABLE'),
        bigquery.SchemaField('addresses', 'RECORD', mode='REPEATED', fields=[
            bigquery.SchemaField('status', 'STRING', mode='NULLABLE'),
            bigquery.SchemaField('address', 'STRING', mode='NULLABLE'),
            bigquery.SchemaField('city', 'STRING', mode='NULLABLE'),
            bigquery.SchemaField('state', 'STRING', mode='NULLABLE'),
            bigquery.SchemaField('zip', 'STRING', mode='NULLABLE'),
            bigquery.SchemaField('numberOfYears', 'STRING', mode='NULLABLE'),
        ]),
    ]
    table_ref = dataset_ref.table(table_id)
    # Uncomment following lines to also create the destination table
    # table = bigquery.Table(table_ref, job_config.schema)
    # table = client.create_table(table)

    # print('Created table {}'.format(table.full_table_id))

    job_config.source_format = "NEWLINE_DELIMITED_JSON"

    load_job = client.load_table_from_uri(
        uri,
        table_ref,
        job_config=job_config)  # API request

    assert load_job.job_type == 'load'

    load_job.result()  # Waits for table load to complete.

    assert load_job.state == 'DONE'

if __name__ == '__main__':
    load_nested_json()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...