В настоящее время я новичок в использовании Apache Beam в Python с бегуном Dataflow.Я заинтересован в создании пакетного конвейера, который публикуется в Google Cloud PubSub, я поработал с Beam Python API и нашел решение.Однако во время моих исследований я столкнулся с некоторыми интересными проблемами, которые вызвали у меня любопытство.
1.Успешный конвейер
В настоящее время мой успешный конвейер лучей для публикации данных в пакетном режиме из GCS выглядит следующим образом:
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
options = PipelineOptions(flags=argv)
from datapipes.common.dataflow_utils import CsvFileSource
from datapipes.protos import proto_schemas_pb2
from google.protobuf.json_format import MessageToJson
with beam.Pipeline(options=options) as p:
normalized_data = (
p |
"Read CSV from GCS" >> beam.io.Read(CsvFileSource(
"gs://bucket/path/to/file.csv")) |
"Normalize to Proto Schema" >> beam.Map(
lambda data: MessageToJson(
proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
indent=0,
preserving_proto_field_name=True)
)
)
(normalized_data |
"Write to PubSub" >> beam.ParDo(
PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
)
2.Неудачные конвейеры
Здесь я попытался сделать публикацию доступной через DoFn
.Я пытался использовать следующие методы:
a.Инициализация издателя в DoFn
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
from google.cloud import pubsub_v1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
self.publisher = pubsub_v1.PublisherClient(batch_settings)
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
... ## same as 1
b.Инициализируйте Publisher вне DoFn и передайте его в DoFn
class PublishFn(beam.DoFn):
def __init__(self, publisher, topic_path):
self.publisher = publisher
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
.... ## same as 1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
with beam.Pipeline(options=options) as p:
... # same as 1
(normalized_data |
"Write to PubSub" >> beam.ParDo(
PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic"))
)
Обе попытки сделать издателя общим для всех методов DoFn
не увенчались успехом со следующими сообщениями об ошибках:
File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
и
File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
Мои вопросы будут:
Улучшит ли реализация общего издателя производительность конвейера луча?Если да, то я хотел бы изучить это решение.
Почему возникают ошибки на моих неисправных конвейерах?Это из-за инициализации и передачи объекта пользовательского класса в DoFn вне функции process
?Если это связано с этим, как я могу реализовать конвейер таким образом, чтобы я мог повторно использовать пользовательский объект в DoFn?
Спасибо, ваша помощь будет принята с благодарностью.
Редактировать: решение
Хорошо, поэтому Анкур объяснил, почему возникает моя проблема, и обсудил, как выполняется сериализация на DoFn.Основываясь на этих знаниях, я теперь понимаю, что есть два решения для создания или повторного использования пользовательского объекта в DoFn:
Сделать пользовательский объект Serializable: это позволяет объекту быть инициализированным / доступнымпри создании объекта DoFn (под __init__
).Этот объект должен быть сериализуемым, так как он будет сериализован во время передачи конвейера, в котором будет создан объект DoFn (который вызывает __init__
).Как вы можете достичь этого, ответьте ниже в моем ответе.Кроме того, я обнаружил, что это требование на самом деле связано с Beam Documentation в [1] [2].
Инициализация несериализуемых объектов в функциях DoFn вне __init__
, чтобы избежать сериализации, посколькуфункции вне init не вызываются во время отправки конвейера.Как вы можете это сделать, объясняется в ответе Анкура.
Ссылки:
[1] https://beam.apache.org/documentation/programming-guide/#core-beam-transforms
[2] https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms