gsutil notification create -t data-test-notifications -f json gs://vikct001-test-bucket
gcloud pubsub subscriptions create data-test-subscription --topic data-test-notifications
gcloud functions deploy pubsub_to_bigquery --region us-central1 --runtime python37 --trigger-topic data-test-notifications --source gs://pubsub_to_bigquery-bucket/test-code.zip
У меня есть простой файл csv в корзине gs: // vikct001-test-bucket, и я настроил уведомления для pubsub topi c, которые затем необходимо обработать с помощью облачной функции и вставить в таблицу bigquery .
Я имел в виду ссылку ниже для своего тематического исследования: https://medium.com/@milosevic81 / copy-data-from-pub-sub-to-bigquery-496e003228a1
с использованием ниже команды для создания уведомления / подписки и развертывания облачной функции.
gsutil notification create -t data-test-notifications -f json gs://vikct001-test-bucket
gcloud pubsub subscriptions create data-test-subscription --topic data-test-notifications
gcloud functions deploy pubsub_to_bigquery --region us-central1 --runtime python37 --trigger-topic data-test-notifications --source gs://pubsub_to_bigquery-bucket/test-code.zip
Вот мой Python код:
from google.cloud import bigquery
import base64, json, sys, os
def pubsub_to_bigquery(event, context):
print("event:",event)
print("context:",context)
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
incoming_data = json.loads(pubsub_message)
print("incoming data:",incoming_data)
write_to_bigquery(os.environ['my_dataset'], os.environ['my_table'], incoming_data)
def write_to_bigquery(dataset, table, document):
bigquery_client = bigquery.Client()
dataset_ref = bigquery_client.dataset(dataset)
table_ref = dataset_ref.table(table)
table = bigquery_client.get_table(table_ref)
errors = bigquery_client.insert_rows(table, [document])
if errors != [] :
print(errors, file=sys.stderr)
else:
print("New rows have been added to big query table.")
Облачная функция запускается, когда я нажимаю csv файл в корзину, но при этом вставляются нули в таблицу bigquery.
ниже - это то, что я скопировал из файла журнала.
входящие данные: {'kind': 'storage # object', 'id' : 'vikct001-test-bucket / input_test_records.csv / 1590839144279480', 'selfLink': 'https://www.googleapis.com/storage/v1/b/vikct001-test-bucket/o/input_test_records.csv', 'name': 'input_test_records.csv', 'bucket': 'vikct001-test- ведро ',' поколение ':' 1590839144279480 ',' метагенерация ':' 1 ',' contentType ':' текст / csv ',' timeCr eated ':' 2020-05-30T11: 45: 44.279Z ',' updated ':' 2020-05-30T11: 45: 44.279Z ',' storageClass ':' СТАНДАРТНЫЙ ',' timeStorageClassUpdated ':' 2020-05- 30T11: 45: 44.279Z ',' size ':' 99 ',' md5Ha sh ':' + gsELw4UmqWR / kUZyO2gSg == ',' mediaLink ':' https://www.googleapis.com/download/storage/v1/b/vikct001-test-bucket/o/input_test_records.csv?generation=1590839144279480&alt=media ',' crc32 c ':' zKnBPQ == ',' etag ':' CLjb29DB2 + kCEAE = '}