Почему пользовательский объект Python нельзя использовать с ParDo Fn? - PullRequest
1 голос
/ 24 апреля 2019

В настоящее время я новичок в использовании Apache Beam в Python с бегуном Dataflow.Я заинтересован в создании пакетного конвейера, который публикуется в Google Cloud PubSub, я поработал с Beam Python API и нашел решение.Однако во время моих исследований я столкнулся с некоторыми интересными проблемами, которые вызвали у меня любопытство.

1.Успешный конвейер

В настоящее время мой успешный конвейер лучей для публикации данных в пакетном режиме из GCS выглядит следующим образом:

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        from google.cloud import pubsub_v1
        publisher = pubsub_v1.PublisherClient()
        future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    options = PipelineOptions(flags=argv)

    from datapipes.common.dataflow_utils import CsvFileSource
    from datapipes.protos import proto_schemas_pb2
    from google.protobuf.json_format import MessageToJson

    with beam.Pipeline(options=options) as p:
        normalized_data = (
                p |
                "Read CSV from GCS" >> beam.io.Read(CsvFileSource(
                    "gs://bucket/path/to/file.csv")) |
                "Normalize to Proto Schema" >> beam.Map(
                        lambda data: MessageToJson(
                            proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
                            indent=0,
                            preserving_proto_field_name=True)
                    )
        )
        (normalized_data |
            "Write to PubSub" >> beam.ParDo(
                    PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
            )

2.Неудачные конвейеры

Здесь я попытался сделать публикацию доступной через DoFn.Я пытался использовать следующие методы:

a.Инициализация издателя в DoFn

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        from google.cloud import pubsub_v1

        batch_settings = pubsub_v1.types.BatchSettings(
             max_bytes=1024,  # One kilobyte
             max_latency=1,  # One second
         )
        self.publisher = pubsub_v1.PublisherClient(batch_settings)
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()

def run_gcs_to_pubsub(argv):
    ... ## same as 1

b.Инициализируйте Publisher вне DoFn и передайте его в DoFn

class PublishFn(beam.DoFn):
    def __init__(self, publisher, topic_path):
        self.publisher = publisher
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    .... ## same as 1

    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1024,  # One kilobyte
        max_latency=1,  # One second
    )
    publisher = pubsub_v1.PublisherClient(batch_settings)

    with beam.Pipeline(options=options) as p:
        ... # same as 1
        (normalized_data | 
            "Write to PubSub" >> beam.ParDo(
                PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic"))
        )

Обе попытки сделать издателя общим для всех методов DoFn не увенчались успехом со следующими сообщениями об ошибках:

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__

и

  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

Мои вопросы будут:

  1. Улучшит ли реализация общего издателя производительность конвейера луча?Если да, то я хотел бы изучить это решение.

  2. Почему возникают ошибки на моих неисправных конвейерах?Это из-за инициализации и передачи объекта пользовательского класса в DoFn вне функции process?Если это связано с этим, как я могу реализовать конвейер таким образом, чтобы я мог повторно использовать пользовательский объект в DoFn?

Спасибо, ваша помощь будет принята с благодарностью.

Редактировать: решение

Хорошо, поэтому Анкур объяснил, почему возникает моя проблема, и обсудил, как выполняется сериализация на DoFn.Основываясь на этих знаниях, я теперь понимаю, что есть два решения для создания или повторного использования пользовательского объекта в DoFn:

  1. Сделать пользовательский объект Serializable: это позволяет объекту быть инициализированным / доступнымпри создании объекта DoFn (под __init__).Этот объект должен быть сериализуемым, так как он будет сериализован во время передачи конвейера, в котором будет создан объект DoFn (который вызывает __init__).Как вы можете достичь этого, ответьте ниже в моем ответе.Кроме того, я обнаружил, что это требование на самом деле связано с Beam Documentation в [1] [2].

  2. Инициализация несериализуемых объектов в функциях DoFn вне __init__, чтобы избежать сериализации, посколькуфункции вне init не вызываются во время отправки конвейера.Как вы можете это сделать, объясняется в ответе Анкура.

Ссылки:

[1] https://beam.apache.org/documentation/programming-guide/#core-beam-transforms

[2] https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms

Ответы [ 2 ]

3 голосов
/ 24 апреля 2019

PublisherClient не может быть правильно травлен. Подробнее о травлении здесь . Инициализация PublisherClient в методе process позволяет избежать травления PublisherClient.

Если целью является повторное использование PublisherClient, я бы рекомендовал инициализировать PublisherClient в методе процесса и сохранить его в self, используя следующий код.

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        if not hasattr(self, 'publish'):
            from google.cloud import pubsub_v1
            self.publisher = pubsub_v1.PublisherClient()
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()
1 голос
/ 24 апреля 2019

Благодаря Анкуру я обнаружил, что эта проблема связана с проблемой травления в python.Затем я попытался изолировать проблему, решив сначала проблему посадки PublisherClient, и нашел решение для совместного использования PublisherClient через DoFn на Beam.

В python мы можем засечь пользовательский объект с помощью dill пакет, и я понял, что этот пакет уже используется в реализации Python Beam для выбора объектов.Поэтому я попытался устранить проблему и обнаружил эту ошибку:

TypeError: no default __reduce__ due to non-trivial __cinit__

Затем я попытался исправить эту ошибку, и мой конвейер теперь работает!

НижеВот решение:

class PubsubClient(PublisherClient):
    def __reduce__(self):
        return self.__class__, (self.batch_settings,)

# The DoFn to perform on each element in the input PCollection.
class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path

        from google.cloud import pubsub_v1
        batch_settings = pubsub_v1.types.BatchSettings(
            max_bytes=1024,  # One kilobyte
            max_latency=1,  # One second
        )

        self.publisher = PubsubClient(batch_settings=batch_settings)
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(topic=self.topic_path, data=element.encode("utf-8"))

        return future.result()

# ...the run_gcs_to_pubsub is the same as my successful pipeline

Решение работает следующим образом: во-первых, я создаю подкласс из PublisherClient и сам реализую функцию __reduce__.Обратите внимание, что поскольку я использовал только свойство batch_settings для инициализации PublisherClient, этого свойства достаточно для моей функции __reduce__.Затем я использовал этот модифицированный PublisherClient для моего DoFn в __init__.

Надеюсь, с этим новым решением мой конвейер получит улучшение производительности.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...