Python Потоковое поведение потока данных «WriteToPubSub» поведение - PullRequest
1 голос
/ 08 апреля 2020

Я пытаюсь использовать потоковый поток данных для чтения из PubSub и записи в другой PubSub. Я использую python 3.7.3 версию. Конвейер выглядит примерно так:

lines = (pipe | "Read from PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
    | "Transformation" >> beam.ParDo(PubSubToDict())
    | "Write to PubSub" >> beam.io.WriteToPubSub(topic=OUTPUT, with_attributes=False)
    )

Шаг "Преобразование" - это то, что мне нужно для некоторого пользовательского преобразования. Я гарантирую, что выходные данные этого преобразования будут байтами. Примерно так:

class PubSubToDict(beam.DoFn):
    def process(self, element):
        """pubsub input is a byte string"""
        data = element.decode('utf-8')
        """do some custom transform here"""
        data = data.encode('utf-8')
        return data

Теперь, когда я публикую sh тестовое сообщение, я получаю такую ​​ошибку:

ERROR: Data being published to Pub/Sub must be sent as a bytestring. [while running 'Write to PubSub']

Мне удалось решить эту проблему, вернув вместо этого массив вот так

return [data]

Но я не знаю причину, почему это сработало. Поэтому я искал объяснение этому.

С уважением, Прасад

1 Ответ

1 голос
/ 08 апреля 2020

Это сработало, потому что ParDo позволяет шагу конвейера возвращать несколько выходных элементов для одного элемента ввода, поэтому он ожидает, что итерация будет возвращена.

вы также можете сделать yield data

...