Как сделать общий Protobuf Parser DoFn в Python Beam? - PullRequest
1 голос
/ 26 апреля 2019

Контекст
Я работаю с потоковым конвейером, у которого есть источник данных protobuf в pubsub.Я хочу разобрать этот protobuf в python dict, потому что приемник данных требует, чтобы входные данные были набором dicts.Я успешно разработал Protobuf Parser, инициализировав сообщение protobuf в функции process DoFn.

Зачем нужен универсальный парсер Protobuf

Однако я хотел знать, возможно ли сделать универсальный DotoFn ProtobufParser на Beam?Универсальный DoFn полезен с технической точки зрения, чтобы избежать повторной реализации существующих функций и обеспечения возможности повторного использования кода.Я знаю, что в Java мы можем использовать универсальные шаблоны, поэтому реализовать этот универсальный ProtobufParser в Java относительно просто.Поскольку функции Python являются объектами первого класса, я подумал, можно ли передать класс схемы Protobuf (не объект экземпляра сообщения) в DoFn.Я пытался сделать это, однако продолжал терпеть неудачу.

Успешный синтаксический анализатор с предупреждением: не обобщается

Ниже приведен мой текущий успешный синтаксический анализатор protobuf.Сообщение protobuf инициализируется внутри функции process.

class ParsePubSubProtoToDict(beam.DoFn):

    def process(self, element, *args, **kwargs):
        from datapipes.protos.data_pb2 import DataSchema
        from google.protobuf.json_format import MessageToDict

        message = DataSchema()
        message.ParseFromString(element)

        obj = MessageToDict(message, preserving_proto_field_name=True)

        yield obj

Хотя хорошо, что вышеупомянутый анализатор Protobuf DoFn работает, он не распространяется на все схемы protobuf, поэтому это приведет к необходимости повторной реализации нового анализатора DoFn для другой схемы protobuf.

Мои попытки

Чтобы сделать синтаксический анализатор универсальным для всех схем protobuf, я попытался передать схему protobuf, которая генерируется в Python как класс, в DoFn.

class ParsePubSubProtoToDict(beam.DoFn):
    def __init__(self, proto_class):
        self.proto_class = proto_class

    def process(self, element, *args, **kwargs):
        from google.protobuf.json_format import MessageToDict

        message = self.proto_class()
        message.ParseFromString(element)
        obj = MessageToDict(message, preserving_proto_field_name=True)

        yield obj


def run_pubsub_to_gbq_pipeline(argv):
    ...
    from datapipes.protos import data_pb2

    with beam.Pipeline(options=options) as p:
        (p |
         'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
         'Proto to JSON' >> beam.ParDo(ParsePubSubProtoToDict(data_pb2.DataSchema().__class__)) |
         'Print Result' >> beam.Map(lambda x: print_data(x))

и другие подобные методы, однако все мои попытки терпят неудачу с одним и тем же сообщением об ошибке: pickle.PicklingError: Can't pickle <class 'data_pb2.DataSchema'>: it's not found as data_pb2.DataSchema

Из этого сообщения об ошибке я выдвинул две гипотезы о причине возникновения проблемы:

  1. Класс схемы Protobuf недоступен для сериализации.Однако, эта гипотеза, вероятно, неверна, потому что, хотя я знаю, pickle не может сериализовать схему protobuf, если я использовал dill, я смог сериализовать схему protobuf.Но кроме этого, я все еще немного не уверен в том, как DoFn в Python Beam реализует сериализацию (например: когда он использует dill или pickle для сериализации вещей, каков сериализованный формат объекта, чтобы сделать его сериализуемым и совместимымс DoFn и т. д.)

  2. Ошибка импорта в классе DoFn.Я столкнулся с несколькими проблемами Error Error с python beam из-за области действия функции / класса и рабочих потоков данных, чтобы решить эту проблему, мне пришлось импортировать пакет локально в функцию, где это необходимо, а не глобально в модуле.Поэтому, может быть, если мы передадим класс схемы protobuf в DoFn, импорт схемы фактически будет выполнен вне DoFn, следовательно, DoFn не сможет правильно разрешить имя класса внутри DoFn?


Мои вопросы будут такими:

  1. Почему возникает эта ошибка и как ее устранить?
  2. Можно ли передать класс схемы protobuf?Или есть лучший способ для реализации универсального protobuf для анализатора Python dict DoFn без передачи класса схемы protobuf в DoFn?
  3. Как работает DoFn в Python, как я могу гарантировать, что объект, который передается в DoFnсоздание (__init__) сериализуемо?Есть ли в луче класс Serializable, в котором я мог бы наследовать, чтобы преобразовать свои несериализуемые объекты в сериализуемые?

Большое спасибо!Ваша помощь будет принята с благодарностью.

1 Ответ

1 голос
/ 26 апреля 2019

Я действительно нашел альтернативное решение для создания универсального парсера Protobuf с beam.Map

def convert_proto_to_dict(data, schema_class):
    message = schema_class()

    if isinstance(data, (str, bytes)):
        message.ParseFromString(data)
    else:
        message = data

    return MessageToDict(message, preserving_proto_field_name=True)


def run_pubsub_to_gbq_pipeline(argv):
    ... options initialization
    from datapipes.protos import data_pb2

    with beam.Pipeline(options=options) as p:
        (p |
         'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
         'Proto to Dict' >> beam.Map(lambda x: convert_proto_to_dict(x, data_pb2.DataSchema)) |
         'Print Result' >> beam.Map(lambda x: print_data(x))

Итак, сначала я создал функцию, которая получает класс схемы protobuf и данные protobuf (в настоящее время в байтовых строках) в качестве аргумента. Эта функция инициализирует и анализирует строковые байтовые данные в сообщение protobuf и преобразует сообщение protobuf в словарь python.

Эта функция затем используется beam.Map, так что теперь я смог разработать общий анализатор Protobuf для луча без beam.DoFn. Однако мне все еще интересно, почему класс схемы protobuf проблематичен при использовании с DoFn, поэтому, если вы знаете, почему и как это решить, поделитесь здесь своим ответом, спасибо!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...