У нас есть конвейер данных для получения данных от брокера mosquitto с использованием Python Paho Client . Нам удалось получить данные protobuf, как показано ниже:
1.0.0/LOC/SPOT_GOB/xxxx-xxxx/GPB_LOCR 0 b'\n+\n\txxxx-xxxx\x12\x0c6CC7ECA59000\x1d\xb8~\xe0c%\x008\x98C(\x020\xb0\xe5\xac\xe8x05'
....
Протобуф поступает непрерывно. Эта проблема возникает, когда мы используем скрипт python для анализа данных.
Содержимое моего файла protobuf:
syntax = "proto2";
Package location;
message Locations {
extensions 1001 to max;
// An outlier is defined when x = y = 0
message Location {
optional string venue_id = 1; // ID of the Venue
optional string mac = 2; // Detected/Located device MAC address
optional float x = 3; // x-coordinates of the detected device (possible to be 0)
optional float y = 4; // y-coordinates of the detected device (possible to be 0)
optional uint32 floor_number = 5; // positive integer of the floor number where the device is detected
optional uint32 timestamp = 6; // Unix timestamp when the device is detected (even when it is an outlier)
}
repeated Location locations = 1;
}
Содержимое файла my location_pb2.py:
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='location.proto',
package='wifi_location',
syntax='proto2',
serialized_pb=_b('\n\x0elocation.proto\x12\rwifi_location\"\xb6\x01\n\tLocations\x12\x34\n\tlocations\x18\x01 \x03(\x0b\x32!.wifi_location.Locations.Location\x1ah\n\x08Location\x12\x10\n\x08venue_id\x18\x01 \x01(\t\x12\x0b\n\x03mac\x18\x02 \x01(\t\x12\t\n\x01x\x18\x03 \x01(\x02\x12\t\n\x01y\x18\x04 \x01(\x02\x12\x14\n\x0c\x66loor_number\x18\x05 \x01(\r\x12\x11\n\ttimestamp\x18\x06 \x01(\r*\t\x08\xe9\x07\x10\x80\x80\x80\x80\x02')
)
_LOCATIONS_LOCATION = _descriptor.Descriptor(
name='Location',
full_name='wifi_location.Locations.Location',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='venue_id', full_name='wifi_location.Locations.Location.venue_id', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='mac', full_name='wifi_location.Locations.Location.mac', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='x', full_name='wifi_location.Locations.Location.x', index=2,
number=3, type=2, cpp_type=6, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='y', full_name='wifi_location.Locations.Location.y', index=3,
number=4, type=2, cpp_type=6, label=1,
has_default_value=False, default_value=float(0),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='floor_number', full_name='wifi_location.Locations.Location.floor_number', index=4,
number=5, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='timestamp', full_name='wifi_location.Locations.Location.timestamp', index=5,
number=6, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto2',
extension_ranges=[],
oneofs=[
],
serialized_start=101,
serialized_end=205,
)
_LOCATIONS = _descriptor.Descriptor(
name='Locations',
full_name='wifi_location.Locations',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='locations', full_name='wifi_location.Locations.locations', index=0,
number=1, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[_LOCATIONS_LOCATION, ],
enum_types=[
],
options=None,
is_extendable=True,
syntax='proto2',
extension_ranges=[(1001, 536870912), ],
oneofs=[
],
serialized_start=34,
serialized_end=216,
)
_LOCATIONS_LOCATION.containing_type = _LOCATIONS
_LOCATIONS.fields_by_name['locations'].message_type = _LOCATIONS_LOCATION
DESCRIPTOR.message_types_by_name['Locations'] = _LOCATIONS
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
Locations = _reflection.GeneratedProtocolMessageType('Locations', (_message.Message,), dict(
Location = _reflection.GeneratedProtocolMessageType('Location', (_message.Message,), dict(
DESCRIPTOR = _LOCATIONS_LOCATION,
__module__ = 'location_pb2'
# @@protoc_insertion_point(class_scope:wifi_location.Locations.Location)
))
,
DESCRIPTOR = _LOCATIONS,
__module__ = 'location_pb2'
# @@protoc_insertion_point(class_scope:wifi_location.Locations)
))
_sym_db.RegisterMessage(Locations)
_sym_db.RegisterMessage(Locations.Location)
# @@protoc_insertion_point(module_scope
Мой скрипт на питоне:
From location_pb2 import Locations
Def on_message(mqtt, obj, msg):
W = Locations()
RW = W.SerializeToString()
Print(RW)
A = W.ParseFromString(RW)
Print (A)
конечный результат анализа, как показано ниже:
b ''
0
b ''
0
b ''
....
правильно проанализированные данные должны выглядеть как venue_id, mac, x, y, номер этажа, отметка времени .