Вот пример кода, который вы можете использовать
- Чтение сообщений из Pub / Sub
from fastavro import parse_schema, schemaless_reader
messages = (p
| beam.io.ReadFromPubSub(
subscription=known_args.input_subscription)
.with_output_types(bytes))
Используйте пакет Fastavro для определения схемы и считывателя через определение класса
class AvroReader:
def __init__(self, schema):
self.schema = schema
def deserialize(self, record):
bytes_reader = io.BytesIO(record)
dict_record = schemaless_reader(bytes_reader, self.schema)
return dict_record
Теперь отобразите байтовые элементы и укажите схему
schema = avro.schema.parse(open("avro.avsc", "rb").read())
avroReader = AvroReader(schema)
lines = messages | "decode" >> beam.Map(lambda input: avroReader.deserialize(input))
Строки должны иметь PCollection
в форме Avro.