Вы можете использовать ParDo, который возвращает элементы, только когда условие выполнено.Например, рассмотрим следующую структуру сообщения, в которой поле publish
указывает, должно ли обработанное сообщение выводиться ParDo или нет:
{"publish":"true","body":"This message should be published"}
{"publish":"false","body":"This message should *NOT* be published"}
Мы получим значения только тогда, когда для publish
установлено значениеtrue
:
class FilterFn(beam.DoFn):
def process(self, element):
if (element['publish'] == 'true'):
yield element['body']
и код основного конвейера:
lines = p | 'Read messages' >> beam.io.ReadStringsFromPubSub(topic=known_args.input)
jsons = lines | 'Load into JSON' >> beam.Map(lambda x: json.loads(x))
filtered = jsons | 'Filter messages' >> beam.ParDo(FilterFn())
filtered | 'Publish messages' >> beam.io.WriteStringsToPubSub(topic=known_args.output)
Извлечение сообщений из выходной подписки вернет только одно сообщение:
Это сообщение должно быть опубликовано