У меня есть конвейер, который получает поток событий от PubSub, применяя 1-часовое окно, а затем записывает их в файл в Google Cloud Storage.Недавно я понял, что иногда в 1-часовом окне происходит слишком много событий, поэтому я также добавил триггер, который срабатывает, если на панели находится более 100 000 событий.Теперь проблема в том, что ограничение в 100К запускается, только когда одна группа внутри окна превышает номер, но не весь конвейер.
Соответствующая часть конвейера выглядит следующим образом:
PCollection<String> rawEvents = pipeline
.apply("Read PubSub Events",
PubsubIO.readStrings()
.fromSubscription(options.getInputSubscription()));
rawEvents
.apply("1h Window",
Window.<String>into(FixedWindows.of(Duration.standardHours(1))
.triggering(
Repeatedly
.forever(
AfterFirst.of(
AfterPane.elementCountAtLeast(100000),
AfterWatermark.pastEndOfWindow())))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(7),
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY))
.apply("Write File(s)", new WriteFiles(options, new EventPartitioner()));
Компонент WriteFiles
- это PTransform
, который расширяется до FileIO.Write
, и он группирует элементы по ключу.
Как бы я мог сделать так, чтобы окно срабатывало после полногоиз 100 тыс. событий, находящихся в конвейере, а не 100 тыс. событий для конкретной группы?Заранее спасибо!