Я разработал образец конвейера лучей в Python, который получает некоторые данные из подписки pubsub (элемент данных - это имя человека с его возрастом, цель состоит в том, чтобы подсчитать, сколько людей старше определенного возраста находятся в фиксированном окне ).
Фиксированное окно установлено на 30 секунд, без дополнительной настройки.
Проблема в том, что выход срабатывает случайным образом, после первого вывода конвейер начинает передавать выходные данные (например, 5 или 6 выходов) между текущим окном и следующим, которое должно выдать результат через 60 секунд.
with beam.Pipeline(options=pipeline_options) as p:
data = p | ReadFromPubSub(topic=known_args.input, with_attributes=True, timestamp_attribute="timestamp")
transformed = (data
| 'FormatMessage' >> beam.Map(format_message)
| 'Add Timestamp: %s' >> beam.ParDo(AddTimestampDoFn())
| beam.WindowInto(window.FixedWindows(30))
| "Filter" >> beam.Filter(filter_names, known_args.rules)
| "ReMap" >> beam.Map(lambda x: (x['data']))
| beam.ParDo(CollectTimings())
| 'Group' >> beam.GroupByKey()
| 'Count' >> beam.CombineValues(beam.combiners.CountCombineFn())
)
serialized = (transformed
| beam.Map(lambda x: json.dumps(x))
| beam.Map(printresults)
)
serialized | "Write To PubSub" >> WriteStringsToPubSub(known_args.output)
Насколько я понимаю, основываясь на документации Beam, я должен получать выходные данные (если есть хотя бы одни входные данные) каждые 30 секунд, но я получаю несколько выходных данных в окне.
Что может быть причиной такого поведения?