Beam / Python - триггер AfterProcessingTime не работает? - PullRequest
0 голосов
/ 08 мая 2019

Я написал небольшой конвейер Beam на Python для подсчета сообщений, отправленных в чате.Сообщения доставляются в Pub / Sub, и у них уже есть временная метка, которую я использую с apache-beam.

Я использую фиксированное время в 10 минут, и я хотел бы выводить результаты каждыйминут ... но это не работает.У меня вывод каждые 10 минут (размер окна).

events = \
            (p
             | 'ReadPubSub' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes)
             | 'DecodeString' >> beam.Map(lambda e: e.decode('utf-8'))
             | 'TransformJsonToDictionary' >> beam.Map(lambda e: json.loads(e))
             | 'ParseEventsFn' >> beam.ParDo(ParseEventsFn())
             | 'AddEventTimestamps' >> beam.Map(
                        lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
             | 'ApplyWindow' >> beam.WindowInto(
                        window.FixedWindows(size=10 * 60),
                        trigger=trigger.Repeatedly(
                            trigger.AfterProcessingTime(delay=1 * 60)
                        ),
                        accumulation_mode=trigger.AccumulationMode.DISCARDING)
             | 'PairWithOne' >> beam.Map(lambda e: (e['channel_id'], 1))
             | beam.CombinePerKey(sum)
             | 'DEBUG' >> beam.ParDo(PrintFn('DEBUG')))

Я что-то здесь упускаю?Как добиться чего-то подобного, используя событие-время?

спасибо

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...