Чтение сообщений AVRO из PubSub в потоке данных Python - PullRequest
1 голос
/ 14 апреля 2020

У меня есть требование читать AVRO сообщения из PubSub topi c другого проекта GCP. Ранее я реализовал Python конвейеры потока данных, которые читают JSON сообщения из PubSub и записывают в BigQuery. Но я новичок в обработке сообщений AVRO. Я попытался найти Python документацию для AVRO, и она указывает мне на эту ссылку https://avro.apache.org/docs/current/gettingstartedpython.html

В этой ссылке есть примеры, которые читают из файлов и записывают в файлы, но я не Не думаю, что эти функции будут полезны для чтения из PubSub. Я использую приведенное ниже преобразование для чтения из PubSub, где вывод является байтовой строкой.

"Read from PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)

Мне нужен способ чтения этих байтов (формат AVRO)

1 Ответ

2 голосов
/ 15 апреля 2020

Вот пример кода, который вы можете использовать

  1. Чтение сообщений из Pub / Sub
from fastavro import parse_schema, schemaless_reader

messages = (p
            | beam.io.ReadFromPubSub(
                subscription=known_args.input_subscription)
            .with_output_types(bytes))
Используйте пакет Fastavro для определения схемы и считывателя через определение класса
class AvroReader:
    def __init__(self, schema):
        self.schema = schema

    def deserialize(self, record):
        bytes_reader = io.BytesIO(record)
        dict_record = schemaless_reader(bytes_reader, self.schema)
        return dict_record
Теперь отобразите байтовые элементы и укажите схему
schema = avro.schema.parse(open("avro.avsc", "rb").read())
avroReader = AvroReader(schema)

lines = messages | "decode" >> beam.Map(lambda input: avroReader.deserialize(input))

Строки должны иметь PCollection в форме Avro.

...