Я изучаю концепции Window & Triggering в Apache с целью:
- чтения для неограниченных источников (PubSub)
- запись входящего сообщения на диск localhost каждые 5 сек. ИСПРАВЛЕНО Интервал окна
ВЫПУСК: нет записи, записываемой на диск localhost (конвейер создал папку beam-team- и записал туда несколько файлов)но output.csv в назначенном получателе записывается каждые 5 секунд)
- работает apache-beam == 2.9.0, Python 2.7.10
- Пробовал оба: DirectRunner , а также DataFlowRunner (с GCS Bucket в качестве пункта назначения)
Вот код (заранее большое спасибо за любые советы):
p = beam.Pipeline(runner=None, options=options, argv=None)
"""
#1) Read incoming messages & apply Windowing
"""
lines = p | "read_sub" >> beam.io.gcp.pubsub.ReadFromPubSub(topic=None, subscription=SUBSCRIBER, with_attributes=True) \
"""
#2) Apply 5 sec Windowing
"""
| 'window' >> beam.WindowInto(beam.window.FixedWindows(5))
"""
#3) apply Map() ops
"""
output = lines | "pardo" >> beam.Map(lambda x: x.data)
"""
#4) write out to localhost disk
"""
output | beam.io.WriteToText('output', file_name_suffix='.csv', header='time, colname1, colname2')
p.run().wait_until_finish()
Заранее большое спасибо за любой совет!
Ура!