Запись данных из pubsub в bigtable с помощью облачных функций - PullRequest
0 голосов
/ 01 декабря 2019

Я новичок в облачной большой таблице и у меня большие проблемы с использованием облачных функций записи данных из pub / sub в bigtable.

Облачные функции получают сообщения от pubsub, но проблема в следующем шаге,запись в bigtable.

Сообщение создается в скрипте Python и отправляется в pub / sub.

Один пример сообщения:

b '{"ЭДА": 2.015176, "температура": 33,39, "БВП": - 0,49, "x_acc": - 36,0, "y_acc": - 38,0, "z_acc": - 128,0, "heart_rate": 83,78, "iddevice":15.0, "timestamp": "2019-12-01T20: 01: 36.927Z"} '

Для записи в большую таблицу я создал таблицу:

 from google.cloud import bigtable 
 from google.cloud.bigtable import column_family

 client = bigtable.Client(project="projectid", admin=True) 
 instance = client.instance("bigtableinstance")
 table = instance.table("bigtable1")
 print('Creating the {} table.'.format(table)) 
 print('Creating columnfamily cf1 with Max Version GC rule...')
 max_versions_rule = column_family.MaxVersionsGCRule(2)
 column_family_id = 'cf1'
 column_families = {column_family_id: max_versions_rule}
 if not table.exists():
     table.create(column_families=column_families)
     print("Table {} is created.".format(table)) 
 else:
     print("Table {} already exists.".format(table))

Это работаетбез проблем.

Теперь я попытался написать сообщение через pub / sub в bigtable со следующим кодом python в облачных функциях, используя метод main:

import json
import base64
import os
from google.cloud import bigtable
from google.cloud.bigtable import column_family, row_filters


project_id = os.environ.get('projetid', 'UNKNOWN')
INSTANCE = 'bigtableinstance'
TABLE = 'bigtable1'

client = bigtable.Client(project=project_id, admin=True)
instance = client.instance(INSTANCE)

colFamily = "cf1"
def writeToBigTable(table, data):
#    Parameters row_key (bytes) – The key for the row being created.
#    Returns A row owned by this table.
        row_key = data[colFamily]['iddevice'].value.encode()
        row = table.row(row_key)
        for colFamily in data.keys():
            for key in data[colFamily].keys():
                row.set_cell(colFamily,
                                        key,
                                        data[colFamily][key])
        table.mutate_rows([row])
        return data

def selectTable():
    stage = os.environ.get('stage', 'dev')
    table_id = TABLE + stage
    table = instance.table(table_id)
    return table


def main(event, context):
    data = base64.b64decode(event['data']).decode('utf-8')
    print("DATA: {}".format(data))
    eda, temperature, bvp, x_acc, y_acc, z_acc, heart_rate, iddevice, timestamp = data.split(',')

    table = selectTable()

    data = {'eda': eda,
         'temperature': temperature,
         'bvp': bvp,
         'x_acc':x_acc,
         'y_acc':y_acc,
         'z_acc':z_acc,
         'heart_rate':heart_rate,
         'iddevice':iddevice,
         'timestamp':timestamp}
    writeToBigTable(table, data)
    print("Data Written: {}".format(data))

Я пробовал разные версии, но не могунайти решение.

Спасибо за помощь.

Всего наилучшего

Доминик

Ответы [ 2 ]

2 голосов
/ 01 декабря 2019

Я думаю, что эта строка неверна:

    row_key = data[colFamily]['iddevice'].value.encode()

Вы передаете объект данных, но у него нет свойства 'cf1'. Вы также не должны кодировать это. Попробуйте:

    row_key = data['iddevice']

Ваш цикл for также будет иметь ту же проблему. Я думаю, что это то, что вы хотите вместо этого

    for col in data.keys():
        row.set_cell(colFamily, key, data[key])

Кроме того, я знаю, что вы просто играете с ним, но использование идентификатора устройства в качестве единственного значения для ключа строки будет плохо. Рекомендуется комбинировать ключ строки и дату или одно из ваших других свойств (в зависимости от вашего запроса) и использовать его в качестве ключа строки. Есть документ по Cloud Bigtable схема , который полезен, и codelab , использующий более реалистичный пример набора данных и рассматривающий, как выбрать схему для этого примера. Это на Java, но вы все равно можете импортировать данные и выполнять свои собственные запросы.

0 голосов
/ 02 декабря 2019

сначала большое спасибо за помощь.

Я пытался исправить это с помощью кода, который есть, но, к сожалению, сейчас он не работает из-за других ошибок.

AttributeError: у объекта 'DirectRow' нет атрибута 'append'

Я предполагаю, что это находится в следующей строке кода

        row.set_cell(colFamily,
                     key,
                     data[key])

Я мог бы предположить, что источник ошибок находится вразделение строки «data»

eda, temperature, bvp, x_acc, y_acc, z_acc, heart_rate, iddevice, timestamp = data.split(',')

Например, eda будет выглядеть так:

"'eda':2.015176"

, что выглядит довольно неправильно для меня.

Особенно, когда я вставляюэто в следующем тексте:

 data = {'eda': eda,....}

Ошибка

AttributeError: у объекта 'DirectRow' нет атрибута. 'append', похоже, говорит о том, что существует проблема с даннымиЯ хочу обработать с set_cell. Существует указанная set_cell со строкой в ​​виде списка или любой другой итерируемый экземпляр прямой строки. Разве это не подходит для этого?

Я попробовал обходной путь со списком, но это, кажется, делает его еще хуже.

client = bigtable.Client(project=project_id, admin=True)
instance = client.instance(INSTANCE)

colFamily = "cf1"
def writeToBigTable(table, dat):

    row_key = "{}-{}".format(dat[16], dat[17])
    row = table.row(row_key)
    for n in range(len(dat)):
        row.set_cell(colFamily,
                     dat[n],
                     dat[n+9])
    table.mutate_rows([row])
    return dat

def selectTable():
    stage = os.environ.get('stage', 'dev')
    table_id = TABLE + stage
    table = instance.table(table_id)
    return table


def main(event, context):
    data = base64.b64decode(event['data']).decode('utf-8')
    print("DATA: {}".format(data))
    var_1, eda, var_2, temperature, var_3, bvp, var_4, x_acc, var_5, y_acc, var_6, z_acc, var_7, heart_rate, var_8, iddevice, var_9, timestamp = data.replace(':',',').split(',')

    table = selectTable(); dat = [var_1, var_2, var_3, var_4, var_5, var_6, var_7, var_8, var_9, eda, temperature, bvp, x_acc, y_acc, z_acc, heart_rate, iddevice, timestamp]; 

#   data = {'eda': eda,
#         'temperature': temperature,
#         'bvp': bvp,
#         'x_acc':x_acc,
#         'y_acc':y_acc,
#         'z_acc':z_acc,
#         'heart_rate':heart_rate,
#         'iddevice':iddevice,
#         'timestamp':timestamp}
    writeToBigTable(table, dat)
    print("Data Written: {}".format(data))

Я очень сильно застрял в этой проблеме и не имею дальнейших идей, какрешить это.

...