Beam Pipeline (PY) Выход не записывается на локальный диск - PullRequest
0 голосов
/ 02 января 2019

Я изучаю концепции Window & Triggering в Apache с целью:

  • чтения для неограниченных источников (PubSub)
  • запись входящего сообщения на диск localhost каждые 5 сек. ИСПРАВЛЕНО Интервал окна

ВЫПУСК: нет записи, записываемой на диск localhost (конвейер создал папку beam-team- и записал туда несколько файлов)но output.csv в назначенном получателе записывается каждые 5 секунд)

  • работает apache-beam == 2.9.0, Python 2.7.10
  • Пробовал оба: DirectRunner , а также DataFlowRunner (с GCS Bucket в качестве пункта назначения)

Вот код (заранее большое спасибо за любые советы):

p = beam.Pipeline(runner=None, options=options, argv=None)

"""
#1) Read incoming messages & apply Windowing
"""
lines = p | "read_sub" >> beam.io.gcp.pubsub.ReadFromPubSub(topic=None, subscription=SUBSCRIBER, with_attributes=True) \


"""
#2) Apply 5 sec Windowing
"""
          | 'window' >> beam.WindowInto(beam.window.FixedWindows(5))


"""
#3) apply Map() ops
"""
output = lines | "pardo" >> beam.Map(lambda x: x.data)


"""
#4) write out to localhost disk
"""

output | beam.io.WriteToText('output', file_name_suffix='.csv', header='time, colname1, colname2')

p.run().wait_until_finish()

Заранее большое спасибо за любой совет!

Ура!

1 Ответ

0 голосов
/ 16 января 2019

Вы читаете из неограниченного источника и пытаетесь записать в ограниченный источник.Хотя Beam API для Java поддерживает его, используя метод withWindowedWrites , но он пока не поддерживается в Python, что является долгожданной полезной функцией.Поэтому вам нужно либо переключиться на Java, либо записать его в BigQuery.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...