Как регистрировать входящие сообщения в конвейере Apache Beam - PullRequest
0 голосов
/ 06 июля 2019

Я пишу простой конвейер потоковой передачи лучей Apache, беру вводные данные из темы pubsub и сохраняю их в больших запросах.В течение нескольких часов я думал, что не могу даже прочитать сообщение, так как я просто пытался записать ввод в консоль:

events = p | 'Read PubSub' >> ReadFromPubSub(subscription=SUBSCRIPTION)
logging.info(events)

Когда я пишу это в текст, он работает отлично!Однако мой вызов logger никогда не происходит.

Как люди разрабатывают / отлаживают эти потоковые конвейеры?

Я попытался добавить следующую строку: events | 'Log' >> logging.info(events)

Использование print() также не дает результатов в консоли.

1 Ответ

1 голос
/ 06 июля 2019

Это потому, что events является PCollection, поэтому вам необходимо применить к нему PTransform.

Самый простой способ - применить ParDo к events:

events | 'Log results' >> beam.ParDo(LogResults())

который определяется как:

class LogResults(beam.DoFn):
  """Just log the results"""
  def process(self, element):
    logging.info("Pub/Sub event: %s", element)
    yield element

Обратите внимание, что я также выдаю элемент в случае, если вы хотите применить дальнейшие шаги вниз по течению, такие как запись в приемник после регистрации элементов. См., Например, номер здесь .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...