Конвейер потоковой передачи Beam не записывает файлы в корзину - PullRequest
1 голос
/ 17 июня 2020

UI имеет python конвейер потоковой передачи в GCP Dataflow, который считывает тысячи сообщений из PubSub, например:

    with beam.Pipeline(options=pipeline_options) as p:
      lines = p | "read" >> ReadFromPubSub(topic=str(job_options.inputTopic))
      lines = lines | "decode" >> beam.Map(decode_message)
      lines = lines | "Parse" >> beam.Map(parse_json)
      lines = lines | beam.WindowInto(beam.window.FixedWindows(1*60))
      lines = lines | "Add device id key" >> beam.Map(lambda elem: (elem.get('id'), elem))
      lines = lines | "Group by key" >> beam.GroupByKey()
      lines = lines | "Abandon key" >> beam.Map(flatten)
      lines | "WriteToAvro" >> beam.io.WriteToAvro(job_options.outputLocation, schema=schema, file_name_suffix='.avro', mime_type='application/x-avro')

конвейер работает нормально, за исключением того, что он никогда не производит никакого вывода. Есть идеи, почему?

1 Ответ

2 голосов
/ 19 июня 2020

Похоже, с вашим кодом было несколько проблем. Во-первых, были некоторые плохо отформатированные данные в отношении null / None (вы уже исправили) и int / float (упоминается в комментариях). Наконец, преобразование WriteToAvro не может записывать неограниченные PCollections. Существует обходной путь, в котором вы определяете новый приемник и используете его с преобразованием WriteToFiles , которое может записывать неограниченные PCollections.

Обратите внимание, что с На момент написания этой публикации (2020-06-18) этот метод не работает с Apache Beam Python SDK <= 2.23. Это связано с тем, что средство выбора Python не может десериализовать обработанную схему Avro (см. <a href="https://issues.apache.org/jira/browse/BEAM-6522" rel="nofollow noreferrer"> BEAM-6522 ). В этом случае это заставляет решение использовать вместо этого FastAvro. Вы можете использовать Avro, если вручную обновляете укроп до> = 0.3.1.1 и Avro до> = 1.9.0, но будьте осторожны, так как это в настоящее время не проверено.

С учетом предостережений, вот обходной путь:

from apache_beam.io.fileio import FileSink
from apache_beam.io.fileio import WriteToFiles
import fastavro

class AvroFileSink(FileSink):
    def __init__(self, schema, codec='deflate'):
        self._schema = schema
        self._codec = codec

    def open(self, fh):
        # This is called on every new bundle.
        self.writer = fastavro.write.Writer(fh, self._schema, self._codec)

    def write(self, record):
        # This is called on every element.
        self.writer.write(record)

    def flush(self):
        self.writer.flush()

Эта новая раковина используется следующим образом:

import apache_beam as beam

# Replace the following with your schema.
schema = fastavro.schema.parse_schema({
    'name': 'row',
    'namespace': 'test',
    'type': 'record',
    'fields': [
        {'name': 'a', 'type': 'int'},
    ],
})

# Create the sink. This will be used by the WriteToFiles transform to write
# individual elements to the Avro file.
sink = AvroFileSink(schema=schema)

with beam.Pipeline(...) as p:
    lines = p | beam.ReadFromPubSub(...)
    lines = ...

    # This is where your new sink gets used. The WriteToFiles transform takes
    # the sink and uses it to write to a directory defined by the path 
    # argument.
    lines | WriteToFiles(path=job_options.outputLocation, sink=sink)
...