Денормализуйте файл GCS перед загрузкой в ​​BigQuery - PullRequest
0 голосов
/ 08 января 2020

Я написал API Cloud Run в. Net Ядро, которое считывает файлы из местоположения GCS и затем должно денормализовать (т.е. добавить больше информации для каждой строки, чтобы включить текстовые описания), а затем записать это в таблицу BigQuery. , У меня есть два варианта:

  1. Мой API запуска в облаке может создавать денормализованные файлы CSV и записывать их в другое местоположение GCS. Затем другой API запуска в облаке может взять эти денормализованные CSV-файлы и записать их прямо в BigQuery.
  2. Мой API запуска в облаке может прочитать исходный файл CSV, денормализовать его в памяти (файловый поток) и затем каким-то образом записать из потока файлов в памяти прямо в таблицу BigQuery.

Как лучше всего написать в BigQuery в этом сценарии, если моя цель - производительность (скорость) и стоимость (денежная). Эти файлы примерно 10 КБ каждый перед денормализацией. Каждая строка примерно 1000 символов. После денормализации это примерно в три раза больше. Мне не нужно хранить денормализованные файлы после их успешной загрузки в BigQuery. Я обеспокоен производительностью, а также какими-либо определенными c BigQuery ежедневными квотами на вставки / записи. Я не думаю, что есть, если вы не делаете заявления DML, но поправьте меня, если я ошибаюсь.

1 Ответ

0 голосов
/ 31 января 2020

Я бы использовал облачные функции, которые запускаются при загрузке файла в корзину.

Очень часто у Google есть репо учебное пособие для JSON файлов Потоковая передача данных из облачного хранилища в BigQuery с использованием облачных функций .

Затем я бы изменил пример файла main.py из:

def streaming(data, context):
    '''This function is executed whenever a file is added to Cloud Storage'''
    bucket_name = data['bucket']
    file_name = data['name']
    db_ref = DB.document(u'streaming_files/%s' % file_name)
    if _was_already_ingested(db_ref):
        _handle_duplication(db_ref)
    else:
        try:
            _insert_into_bigquery(bucket_name, file_name)
            _handle_success(db_ref)
        except Exception:
            _handle_error(db_ref)

На это, который принимает CSV-файлы :

import json
import csv
import logging
import os
import traceback
from datetime import datetime

from google.api_core import retry
from google.cloud import bigquery
from google.cloud import storage
import pytz



PROJECT_ID = os.getenv('GCP_PROJECT')
BQ_DATASET = 'fromCloudFunction'
BQ_TABLE = 'mytable'

CS = storage.Client()
BQ = bigquery.Client()


def streaming(data, context):
    '''This function is executed whenever a file is added to Cloud Storage'''
    bucket_name = data['bucket']
    file_name = data['name']

    newRows = postProcessing(bucket_name, file_name)

    # It is recommended that you save 
    # what you process for debugging reasons.
    destination_bucket = 'post-processed' # gs://post-processed/
    destination_name = file_name
    # saveRowsToBucket(newRows,destination_bucket,destination_name)
    rowsInsertIntoBigquery(newRows)



class BigQueryError(Exception):
    '''Exception raised whenever a BigQuery error happened''' 

    def __init__(self, errors):
        super().__init__(self._format(errors))
        self.errors = errors

    def _format(self, errors):
        err = []
        for error in errors:
            err.extend(error['errors'])
        return json.dumps(err)

def postProcessing(bucket_name, file_name):
    blob = CS.get_bucket(bucket_name).blob(file_name)
    my_str = blob.download_as_string().decode('utf-8')
    csv_reader = csv.DictReader(my_str.split('\n'))                                                                   
    newRows = []
    for row in csv_reader:
        modified_row = row # Add your logic
        newRows.append(modified_row)
    return newRows

def rowsInsertIntoBigquery(rows):
    table = BQ.dataset(BQ_DATASET).table(BQ_TABLE)
    errors = BQ.insert_rows_json(table,rows)
    if errors != []:
        raise BigQueryError(errors)

Это будет по-прежнему необходимо определить вашу карту (row-> newRow) и функцию saveRowsToBucket, если вам это нужно.

...