Возникла проблема с потоком данных, которую я надеюсь исправить. По существу, конвейер работает правильно при непрерывной потоковой передаче данных, однако, если я сбрасываю несколько файлов вместе, я сталкиваюсь с проблемой, когда данные не загружаются в bq, и я вижу сотни гигабайт в подэтапе пакетной загрузки на шаге больших запросов
validCollection = validCollection.apply("Specify Window",
Window.<SData>into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(1))
Это код, который у меня есть выше. Мне интересно, может ли кто-нибудь указать мне, почему вышеизложенное не отправляет данные в БК по несколько часов подряд? Я думаю, что это как-то связано с моим триггером, и мне нужно иметь временный триггер или что-то в этом роде, но я понимаю, что это может привести к потере данных, так как связки не принимаются после факта (полностью открыты для исправления)
Большое спасибо заранее