Проблема Google CloudFunctions при записи pandas кадра данных в Google BigQuery - PullRequest
0 голосов
/ 04 марта 2020

Я пытаюсь создать автоматизированный процесс загрузки данных из Google Cloud Storage и писать в Google BigQuery, используя CloudFunctions с Python / pandas.

Я разработал код, который работает в Google Colab. Скрипт загружает в BQ и перемещает файлы в вложенные области. Однако, когда я пытаюсь переместить этот код в функции Google Cloud, происходит сбой при попытке записи в BQ:

from google.cloud import storage
import re
import pandas as pd
from io import BytesIO
import pandas_gbq as gbq
from datetime import datetime
from google.cloud import bigquery as bq
project_id = 'project_id'

# BQ 
destination_table = 'Dataset.Table'
fail_blob = "failure/"
success_blob = "success/"

# GCS
trigger_bucket = 'bucket'
pattern = re.compile(r"\w+(.csv)")
re_date = re.compile('([0-9]+-[0-9]+-[0-9]+)')
#def write_to_bq(data="", context=""):
# storage
storage_client = storage.Client(project=project_id)
# storage_client = storage.Client()
bucket = storage_client.get_bucket(trigger_bucket)
blobs = bucket.list_blobs()

for blob in blobs:
  if '/' not in blob.name and 'risks' in blob.name:   # only looks in trigger_bucket

  fn = blob.name  # fn - filename
  try:
    print('---> Processing:\t %s' % fn)
    dt = re_date.search(fn).group(1)  # date from fn
    csv = blob.download_as_string()   # load csv as string

    # Create df
    df = pd.read_csv(BytesIO(csv), low_memory=False) # create df
    df.columns = [x.replace(' ', '_') for x in df.columns]  # remove spaces in columns
    df['dt'] = dt                                           # create column with date from file name

    # Create schema

    schema = [{'name': 'initiator', 'type': 'STRING'},
              {'name': 'owner', 'type': 'STRING'},
              {'name': 'title', 'type': 'STRING'},
              {'name': 'date', 'type': 'STRING'},
              ]
    # rename columns
    df.columns = ['initiator', 'owner', 'title', 'date']

    # Append data to BQ - make sure there are the same columns
    df.to_gbq(destination_table, project_id,
              if_exists='append', table_schema=schema)

    # when uploaded move to success
    bucket.rename_blob(blob, success_blob + fn )
  except:
    print('\t\tFailed %s' % fn)
    # when failed
    bucket.rename_blob(blob, fail_blob + fn )

Итак, проблема в строке: df.to_gbq (destination_table, project_id, if_exists = 'append', table_schema = schema) Однако я не знаю, как его заменить, или мне нужно добавить что-то еще здесь.

Код должен использовать pandas, так как я изменяю содержимое файла перед загрузкой его в BQ.

Мои требования.txt:

google-cloud-storage==1.13.0
google-cloud-bigquery
pandas==0.23.4
pandas-gbq==0.8.0

В качестве альтернативы Я мог бы сохранить обратно в GCS. Который может быть использован другой CloudFunction для загрузки в BQ.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...