Я новичок в Apache Beam и пытаюсь написать свой первый конвейер на Python для вывода данных из подписки Google Pub / Sub на плоские файлы для дальнейшего использования;в идеале я хочу собирать их в файл, скажем, каждые полчаса.У меня есть следующий код в качестве окончательного преобразования в моем конвейере: -
| 'write output' >> WriteToText('TestNewPipeline.txt')
Однако все созданные файлы находятся в каталоге с префиксом "beam-temp-TestNewPipeline.txt- [somehash]" и пакетируютсяна группы по 10 человек, чего я не ожидал.
Я пытался поиграть с оконной функцией, но, похоже, она не имела большого эффекта, поэтому либо я совершенно не понимаю концепцию, либо делаю что-то совершенно неверное.
код для окна: -
| 'Window' >> beam.WindowInto(beam.window.FixedWindows(5))
Я предполагал, что это приведет к записи в текстовый файл в статическом пятисекундном окне, но это не так.
Fullкод ниже: -
options = PipelineOptions()
options.view_as(StandardOptions).streaming=True
def format_message(message, timestamp=beam.DoFn.TimestampParam):
formatted_message = {
'data': message.data,
'attributes': str(message.attributes),
'timestamp': float(timestamp)
}
return formatted_message
with beam.Pipeline(options=options) as p:
(p
| 'Read From Pub Sub' >> ReadFromPubSub(subscription='projects/[my proj]/subscriptions/[my subscription]',with_attributes=True)
| 'Window' >> beam.WindowInto(beam.window.FixedWindows(5))
| 'Map Message' >> beam.Map(format_message)
| 'write output' >> WriteToText('TestNewPipeline.txt')
)
result = p.run()
Как и ожидалось, процесс запускается бесконечно и успешно читает сообщения из подписки;однако он записывает их только в временные файлы луча.Кто-нибудь может помочь указать, где я иду не так?
Обновление:
Следуя комментариям Джейсона, я немного изменил конвейер: -
class AddKeyToDict(beam.DoFn):
def process(self, element):
return [(element['rownumber'], element)]
with beam.Pipeline(options=options) as p:
(p
| 'Read From Pub Sub' >> ReadFromPubSub(subscription=known_args.input_subscription)# can't make attributes work as yet! ,with_attributes=True)
# failed attempt 1| 'Map Message' >> beam.Map(format_message)
# failed attempt 2| 'Parse JSON' >> beam.Map(format_message_element)
| 'Parse to Json' >> beam.Map(lambda x: json.loads(x))
| 'Add key' >> beam.ParDo(AddKeyToDict())
| 'Window' >> beam.WindowInto(beam.window.FixedWindows(5), trigger=AfterProcessingTime(15), accumulation_mode=AccumulationMode.DISCARDING)
| 'Group' >> beam.GroupByKey()
| 'write output' >> WriteToText(known_args.output_file)
)
Я еще не смог извлечь message_id или опубликованное время из PubSub, поэтому я просто использую число, сгенерированное в моем сообщении.На данный момент я все еще получаю только временные файлы, созданные, и ничего не накапливается в окончательный файл?Начинаю задумываться, не все ли немного не хватает реализации Python, и мне придется взять Java ....