Я читаю записи из темы PubSub в конвейере потока данных. Записи PubSub делятся на фиксированные окна и затем группируются в каждом окне. Каждое окно сортируется по порядковому номеру, так как нам нужно обрабатывать эти записи по порядку, используя beam.SortValues. Затем я записываю записи в Cloud BigTable
Проблема с конвейером - свежесть данных и системная задержка. Кажется, что свежесть данных застряла в какой-то момент, и водяной знак перестает развиваться.
Я использую следующую стратегию управления окнами для создания записей после шага GroupByKey:
PCollection<KV<BigInteger, JSONObject>> window = pubsubRecords.apply("Raw to String", ParDo.of(new LogsFn()))
.apply("Window", Window
.<KV<BigInteger, JSONObject>>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(500),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1)))))
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
);
Я думаю, что проблема может быть в стратегии управления окнами. В основном я хочу сделать следующее: чтение записей из PubSub в FixedWindows за 1 минуту, сортировка окна и запись в BigTable. Если я использую триггер defualt, шаг GroupByKey не выдаст никаких результатов. Может ли кто-нибудь помочь мне с этим?