Я пытаюсь использовать потоковый поток данных для чтения из PubSub и записи в другой PubSub. Я использую python 3.7.3 версию. Конвейер выглядит примерно так:
lines = (pipe | "Read from PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Transformation" >> beam.ParDo(PubSubToDict())
| "Write to PubSub" >> beam.io.WriteToPubSub(topic=OUTPUT, with_attributes=False)
)
Шаг "Преобразование" - это то, что мне нужно для некоторого пользовательского преобразования. Я гарантирую, что выходные данные этого преобразования будут байтами. Примерно так:
class PubSubToDict(beam.DoFn):
def process(self, element):
"""pubsub input is a byte string"""
data = element.decode('utf-8')
"""do some custom transform here"""
data = data.encode('utf-8')
return data
Теперь, когда я публикую sh тестовое сообщение, я получаю такую ошибку:
ERROR: Data being published to Pub/Sub must be sent as a bytestring. [while running 'Write to PubSub']
Мне удалось решить эту проблему, вернув вместо этого массив вот так
return [data]
Но я не знаю причину, почему это сработало. Поэтому я искал объяснение этому.
С уважением, Прасад