Облачный поток данных Google - потоковая передача JSON из Python в PubSub - различия между DirectRunner и DataflowRunner - PullRequest
0 голосов
/ 29 июня 2018

Пытаюсь сделать что-то концептуально простое, но биться головой о стену.

Я пытаюсь создать потоковое задание потока данных в Python, которое использует сообщения JSON из темы / подписки PubSub, выполняет некоторые базовые манипуляции с каждым сообщением (в данном случае преобразование температуры из C в F), а затем публикует запись вернуться на другую тему:

from __future__ import absolute_import

import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window
import json

'''Normalize pubsub string to json object'''
# Lines look like this:
#{"temperature": 29.223036004820123}


def transform_temp(line):
    record = json.loads(line)
    record['temperature'] = record['temperature'] * 1.8 + 32
    return json.dumps(record)

def run(argv=None):
  """Build and run the pipeline."""

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--output_topic', required=True,
      help=('Output PubSub topic of the form '
            '"projects/<PROJECT>/topic/<TOPIC>".'))
  group = parser.add_mutually_exclusive_group(required=True)
  group.add_argument(
      '--input_topic',
      help=('Input PubSub topic of the form '
            '"projects/<PROJECT>/topics/<TOPIC>".'))
  group.add_argument(
      '--input_subscription',
      help=('Input PubSub subscription of the form '
            '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
  known_args, pipeline_args = parser.parse_known_args(argv)

  with beam.Pipeline(argv=pipeline_args) as p:
    # Read the pubsub topic into a PCollection.
    lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
                | beam.Map(transform_temp)
                | beam.io.WriteStringsToPubSub(known_args.output_topic)
              )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

При локальном запуске этого кода с помощью DirectRunner все работает нормально. Однако при переходе на DataflowRunner я никогда не вижу сообщений, опубликованных по новой теме.

Я также пытался добавить некоторые вызовы журналирования в функцию transform_temp, но ничего не вижу в журналах консоли в потоке данных.

Есть предложения? Кстати, если я просто добавлю тему ввода в тему вывода, я смогу увидеть сообщения, чтобы убедиться, что потоковая передача работает нормально.

Большое спасибо!

1 Ответ

0 голосов
/ 15 февраля 2019

Возможно, вам просто не хватает функции windowinto. Документы Apache Beam утверждают, что для потокового конвейера необходимо установить окно не по умолчанию или триггер не по умолчанию. Поскольку вы не определили окно, у вас есть одно глобальное окно, и поэтому оно может бесконечно ждать в конце окна, прежде чем идти в приемник.

...