Контекст
Я работаю с потоковым конвейером, у которого есть источник данных protobuf в pubsub.Я хочу разобрать этот protobuf в python dict, потому что приемник данных требует, чтобы входные данные были набором dicts.Я успешно разработал Protobuf Parser, инициализировав сообщение protobuf в функции process
DoFn.
Зачем нужен универсальный парсер Protobuf
Однако я хотел знать, возможно ли сделать универсальный DotoFn ProtobufParser на Beam?Универсальный DoFn полезен с технической точки зрения, чтобы избежать повторной реализации существующих функций и обеспечения возможности повторного использования кода.Я знаю, что в Java мы можем использовать универсальные шаблоны, поэтому реализовать этот универсальный ProtobufParser в Java относительно просто.Поскольку функции Python являются объектами первого класса, я подумал, можно ли передать класс схемы Protobuf (не объект экземпляра сообщения) в DoFn.Я пытался сделать это, однако продолжал терпеть неудачу.
Успешный синтаксический анализатор с предупреждением: не обобщается
Ниже приведен мой текущий успешный синтаксический анализатор protobuf.Сообщение protobuf инициализируется внутри функции process
.
class ParsePubSubProtoToDict(beam.DoFn):
def process(self, element, *args, **kwargs):
from datapipes.protos.data_pb2 import DataSchema
from google.protobuf.json_format import MessageToDict
message = DataSchema()
message.ParseFromString(element)
obj = MessageToDict(message, preserving_proto_field_name=True)
yield obj
Хотя хорошо, что вышеупомянутый анализатор Protobuf DoFn работает, он не распространяется на все схемы protobuf, поэтому это приведет к необходимости повторной реализации нового анализатора DoFn для другой схемы protobuf.
Мои попытки
Чтобы сделать синтаксический анализатор универсальным для всех схем protobuf, я попытался передать схему protobuf, которая генерируется в Python как класс, в DoFn.
class ParsePubSubProtoToDict(beam.DoFn):
def __init__(self, proto_class):
self.proto_class = proto_class
def process(self, element, *args, **kwargs):
from google.protobuf.json_format import MessageToDict
message = self.proto_class()
message.ParseFromString(element)
obj = MessageToDict(message, preserving_proto_field_name=True)
yield obj
def run_pubsub_to_gbq_pipeline(argv):
...
from datapipes.protos import data_pb2
with beam.Pipeline(options=options) as p:
(p |
'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
'Proto to JSON' >> beam.ParDo(ParsePubSubProtoToDict(data_pb2.DataSchema().__class__)) |
'Print Result' >> beam.Map(lambda x: print_data(x))
и другие подобные методы, однако все мои попытки терпят неудачу с одним и тем же сообщением об ошибке: pickle.PicklingError: Can't pickle <class 'data_pb2.DataSchema'>: it's not found as data_pb2.DataSchema
Из этого сообщения об ошибке я выдвинул две гипотезы о причине возникновения проблемы:
Класс схемы Protobuf недоступен для сериализации.Однако, эта гипотеза, вероятно, неверна, потому что, хотя я знаю, pickle
не может сериализовать схему protobuf, если я использовал dill
, я смог сериализовать схему protobuf.Но кроме этого, я все еще немного не уверен в том, как DoFn в Python Beam реализует сериализацию (например: когда он использует dill
или pickle
для сериализации вещей, каков сериализованный формат объекта, чтобы сделать его сериализуемым и совместимымс DoFn и т. д.)
Ошибка импорта в классе DoFn.Я столкнулся с несколькими проблемами Error Error с python beam из-за области действия функции / класса и рабочих потоков данных, чтобы решить эту проблему, мне пришлось импортировать пакет локально в функцию, где это необходимо, а не глобально в модуле.Поэтому, может быть, если мы передадим класс схемы protobuf в DoFn, импорт схемы фактически будет выполнен вне DoFn, следовательно, DoFn не сможет правильно разрешить имя класса внутри DoFn?
Мои вопросы будут такими:
- Почему возникает эта ошибка и как ее устранить?
- Можно ли передать класс схемы protobuf?Или есть лучший способ для реализации универсального protobuf для анализатора Python dict DoFn без передачи класса схемы protobuf в DoFn?
- Как работает DoFn в Python, как я могу гарантировать, что объект, который передается в DoFnсоздание (
__init__
) сериализуемо?Есть ли в луче класс Serializable, в котором я мог бы наследовать, чтобы преобразовать свои несериализуемые объекты в сериализуемые?
Большое спасибо!Ваша помощь будет принята с благодарностью.