Пытаюсь сделать что-то концептуально простое, но биться головой о стену.
Я пытаюсь создать потоковое задание потока данных в Python, которое использует сообщения JSON из темы / подписки PubSub, выполняет некоторые базовые манипуляции с каждым сообщением (в данном случае преобразование температуры из C в F), а затем публикует запись вернуться на другую тему:
from __future__ import absolute_import
import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window
import json
'''Normalize pubsub string to json object'''
# Lines look like this:
#{"temperature": 29.223036004820123}
def transform_temp(line):
record = json.loads(line)
record['temperature'] = record['temperature'] * 1.8 + 32
return json.dumps(record)
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--output_topic', required=True,
help=('Output PubSub topic of the form '
'"projects/<PROJECT>/topic/<TOPIC>".'))
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
'--input_topic',
help=('Input PubSub topic of the form '
'"projects/<PROJECT>/topics/<TOPIC>".'))
group.add_argument(
'--input_subscription',
help=('Input PubSub subscription of the form '
'"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
# Read the pubsub topic into a PCollection.
lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
| beam.Map(transform_temp)
| beam.io.WriteStringsToPubSub(known_args.output_topic)
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
При локальном запуске этого кода с помощью DirectRunner все работает нормально. Однако при переходе на DataflowRunner я никогда не вижу сообщений, опубликованных по новой теме.
Я также пытался добавить некоторые вызовы журналирования в функцию transform_temp, но ничего не вижу в журналах консоли в потоке данных.
Есть предложения? Кстати, если я просто добавлю тему ввода в тему вывода, я смогу увидеть сообщения, чтобы убедиться, что потоковая передача работает нормально.
Большое спасибо!