Условная запись с использованием облачного потока данных и pubsub - PullRequest
0 голосов
/ 27 сентября 2018

Я пытаюсь использовать Python API облачного потока данных для написания простой программы, которая принимает входные данные от издателя pubsub, проверяет входные данные для условия и затем выводит данные в тему.Я получил программу, написанную там, где она принимает и преобразует входные данные, а также публикует данные в теме.Моя проблема в том, что я хочу публиковать в теме, только если условие выполняется.Например, если ключевое слово присутствует в данных json, я хочу опубликовать сообщение в теме pubsub, но не публиковать сообщение, когда ключевое слово не существует.Я попытался добавить глобальный логический флаг, который становится истинным, когда ключевое слово найдено, и обернул его вокруг следующих строк:

output = (lines
          | 'format' >> beam.Map(format_result)
          | 'encode' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(six.binary_type))
output | beam.io.WriteStringsToPubSub(self.output_topic)

Это не сработало.И у меня заканчиваются идеи.Кто-нибудь знает, можно ли это сделать?

1 Ответ

0 голосов
/ 27 сентября 2018

Вы можете использовать ParDo, который возвращает элементы, только когда условие выполнено.Например, рассмотрим следующую структуру сообщения, в которой поле publish указывает, должно ли обработанное сообщение выводиться ParDo или нет:

{"publish":"true","body":"This message should be published"}
{"publish":"false","body":"This message should *NOT* be published"}

Мы получим значения только тогда, когда для publish установлено значениеtrue:

class FilterFn(beam.DoFn):
    def process(self, element):
        if (element['publish'] == 'true'):
          yield element['body']

и код основного конвейера:

lines = p | 'Read messages' >> beam.io.ReadStringsFromPubSub(topic=known_args.input)
jsons = lines | 'Load into JSON' >> beam.Map(lambda x: json.loads(x))
filtered = jsons | 'Filter messages' >> beam.ParDo(FilterFn())
filtered | 'Publish messages' >> beam.io.WriteStringsToPubSub(topic=known_args.output)

Извлечение сообщений из выходной подписки вернет только одно сообщение:

Это сообщение должно быть опубликовано

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...