Я пытаюсь создать автоматизированный процесс загрузки данных из Google Cloud Storage и писать в Google BigQuery, используя CloudFunctions с Python / pandas.
Я разработал код, который работает в Google Colab. Скрипт загружает в BQ и перемещает файлы в вложенные области. Однако, когда я пытаюсь переместить этот код в функции Google Cloud, происходит сбой при попытке записи в BQ:
from google.cloud import storage
import re
import pandas as pd
from io import BytesIO
import pandas_gbq as gbq
from datetime import datetime
from google.cloud import bigquery as bq
project_id = 'project_id'
# BQ
destination_table = 'Dataset.Table'
fail_blob = "failure/"
success_blob = "success/"
# GCS
trigger_bucket = 'bucket'
pattern = re.compile(r"\w+(.csv)")
re_date = re.compile('([0-9]+-[0-9]+-[0-9]+)')
#def write_to_bq(data="", context=""):
# storage
storage_client = storage.Client(project=project_id)
# storage_client = storage.Client()
bucket = storage_client.get_bucket(trigger_bucket)
blobs = bucket.list_blobs()
for blob in blobs:
if '/' not in blob.name and 'risks' in blob.name: # only looks in trigger_bucket
fn = blob.name # fn - filename
try:
print('---> Processing:\t %s' % fn)
dt = re_date.search(fn).group(1) # date from fn
csv = blob.download_as_string() # load csv as string
# Create df
df = pd.read_csv(BytesIO(csv), low_memory=False) # create df
df.columns = [x.replace(' ', '_') for x in df.columns] # remove spaces in columns
df['dt'] = dt # create column with date from file name
# Create schema
schema = [{'name': 'initiator', 'type': 'STRING'},
{'name': 'owner', 'type': 'STRING'},
{'name': 'title', 'type': 'STRING'},
{'name': 'date', 'type': 'STRING'},
]
# rename columns
df.columns = ['initiator', 'owner', 'title', 'date']
# Append data to BQ - make sure there are the same columns
df.to_gbq(destination_table, project_id,
if_exists='append', table_schema=schema)
# when uploaded move to success
bucket.rename_blob(blob, success_blob + fn )
except:
print('\t\tFailed %s' % fn)
# when failed
bucket.rename_blob(blob, fail_blob + fn )
Итак, проблема в строке: df.to_gbq (destination_table, project_id, if_exists = 'append', table_schema = schema) Однако я не знаю, как его заменить, или мне нужно добавить что-то еще здесь.
Код должен использовать pandas, так как я изменяю содержимое файла перед загрузкой его в BQ.
Мои требования.txt:
google-cloud-storage==1.13.0
google-cloud-bigquery
pandas==0.23.4
pandas-gbq==0.8.0
В качестве альтернативы Я мог бы сохранить обратно в GCS. Который может быть использован другой CloudFunction для загрузки в BQ.