Похоже, с вашим кодом было несколько проблем. Во-первых, были некоторые плохо отформатированные данные в отношении null / None (вы уже исправили) и int / float (упоминается в комментариях). Наконец, преобразование WriteToAvro не может записывать неограниченные PCollections. Существует обходной путь, в котором вы определяете новый приемник и используете его с преобразованием WriteToFiles , которое может записывать неограниченные PCollections.
Обратите внимание, что с На момент написания этой публикации (2020-06-18) этот метод не работает с Apache Beam Python SDK <= 2.23. Это связано с тем, что средство выбора Python не может десериализовать обработанную схему Avro (см. <a href="https://issues.apache.org/jira/browse/BEAM-6522" rel="nofollow noreferrer"> BEAM-6522 ). В этом случае это заставляет решение использовать вместо этого FastAvro. Вы можете использовать Avro, если вручную обновляете укроп до> = 0.3.1.1 и Avro до> = 1.9.0, но будьте осторожны, так как это в настоящее время не проверено.
С учетом предостережений, вот обходной путь:
from apache_beam.io.fileio import FileSink
from apache_beam.io.fileio import WriteToFiles
import fastavro
class AvroFileSink(FileSink):
def __init__(self, schema, codec='deflate'):
self._schema = schema
self._codec = codec
def open(self, fh):
# This is called on every new bundle.
self.writer = fastavro.write.Writer(fh, self._schema, self._codec)
def write(self, record):
# This is called on every element.
self.writer.write(record)
def flush(self):
self.writer.flush()
Эта новая раковина используется следующим образом:
import apache_beam as beam
# Replace the following with your schema.
schema = fastavro.schema.parse_schema({
'name': 'row',
'namespace': 'test',
'type': 'record',
'fields': [
{'name': 'a', 'type': 'int'},
],
})
# Create the sink. This will be used by the WriteToFiles transform to write
# individual elements to the Avro file.
sink = AvroFileSink(schema=schema)
with beam.Pipeline(...) as p:
lines = p | beam.ReadFromPubSub(...)
lines = ...
# This is where your new sink gets used. The WriteToFiles transform takes
# the sink and uses it to write to a directory defined by the path
# argument.
lines | WriteToFiles(path=job_options.outputLocation, sink=sink)