Как экспортировать таблицы в файлы со значениями полей в качестве имен файлов вместо частей - PullRequest
0 голосов
/ 17 сентября 2018

У меня очень большая таблица, полученная из большого запроса через Google Cloud Storage.

Одним из полей в таблице является ZipCode (00000).

в любом случае существует для запросатаблица по почтовому индексу и экспорт результатов в файлы, где почтовый индекс является именем файла.каждый файл будет содержать записи этого почтового индекса.

Можно ли это сделать?

Ответы [ 2 ]

0 голосов
/ 23 сентября 2018

Вот решение на python, для которого я не использовал экспорт BigQuery. Тем не менее конечный результат сохраняется в хранилище как файлы json с разделителями новой строки (такие, которые затем можно загрузить обратно в BigQuery). Это включает запрос, который может стоить дорого для очень больших таблиц. В качестве примера я использовал таблицу с одним столбцом ZipCode и еще двумя столбцами (col1, col2), но это не должно иметь значения. Кроме того, я жестко запрограммировал часть аутентификации.

#!/usr/bin/python

from argparse import ArgumentParser
from google.cloud import bigquery
from google.cloud import storage

def main(project_id, dataset_id, table_id, bucket_name):

    client = bigquery.Client.from_service_account_json('service_account.json',project=project_id)
    dataset = client.dataset(dataset_id)
    # Create a table for intermediate results
    table_ref = client.dataset(dataset_id).table('tmp')

    # Query job with 'tmp' as destination
    # Group by non grouped/aggregated field ZipCode using ARRAY_AGG
    job_config = bigquery.QueryJobConfig()
    job_config.destination = table_ref
    sql = 'SELECT ZipCode, ARRAY_AGG(STRUCT(col1, col2)) FROM `{}.{}.{}` GROUP BY ZipCode'.format(project_id, dataset_id, table_id)
    query_job = client.query(
        sql,
        location='US',
        job_config=job_config)
    query_job.result()

    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)

    rows = client.list_rows(client.get_table(table_ref))
    for row in rows:
        record=''
        # Rest of row is a list of dictionaries with unicode items
        for r in row[1:][0]:
            r = {str(k):str(v) for k,v in r.items()}
            record+=(str(r))+'\n'
        # row[0] will have ZipCode which we want to use to name the exported files
        filename=row[0]+'.json'
        blob = bucket.blob(filename)
        print 'Exporting to gs://{}/{}'.format(bucket_name,filename)
        blob.upload_from_string(record)

    # Delete the tmp table
    client.delete_table(table_ref)

if __name__ == '__main__':
    parser = ArgumentParser()
    parser.add_argument('-p','--project', help="project where the ZipCode table resides", dest='project_id')
    parser.add_argument('-d','--dataset', help="dataset with the ZipCode table", dest='dataset_id')
    parser.add_argument('-t','--table', help="ZipCode table", dest='table_id')
    parser.add_argument('-b','--bucket', help="destination bucket", dest='bucket')

    args = parser.parse_args()
    main(args.project_id,args.dataset_id,args.table_id,args.bucket)
0 голосов
/ 18 сентября 2018

Я бы использовал некоторые Java и Beam / Dataflow DynamicDestination.

Ваш конвейер будет:

  • Чтение данных из BigQuery.
  • Для каждой строки посмотрите наконкретный столбец, в котором указывается, к какому целевому файлу он должен обращаться.

Взгляните на:

Вопрос задан для Python, но в настоящее время DynamicDestination доступны только на Java.

...