После игры с Beam я должен спросить здесь. Есть ли способ построить конвейер, который
- Буферизует элементы на X + 2 секунды по времени события;
- Очищает данные первых X секунд;
- Перейти к шагу 1 ;
Я читаю документацию уже неделю, и я в растерянности. Я не могу понять, как работают триггеры.
Мне это нужно, потому что я хочу буферизовать элементы из PubSub, который не гарантирует порядок, но снижает риск, буферизуя немного больше, чем мне нужно.
Вот что у меня есть (Kotlin). Я пытался смоделировать задержку PubSub для каждого 8-го элемента.
val start = Instant.now()
val streamedData = pipe.apply(GenerateSequence.from(1)
.withRate(1, Duration.standardSeconds(1))
.withTimestampFn { value ->
if (value % 10 == 8L)
start.plus((value - 2) * 1000)
else
start.plus(value * 1000)
})
.apply(MapElements.into(TypeDescriptors.strings()).via(ProcessFunction { "Val: $it" }))
val into: Window<String> = Window.into(FixedWindows.of(Duration.standardSeconds(10)))
val zzz: Window<String> = into.triggering(
Repeatedly.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(2))))
.withAllowedLateness(Duration.standardMinutes(1))
.discardingFiredPanes()
streamedData
.apply(zzz)
.apply(TextIO.write().to(options.output).withNumShards(1).withWindowedWrites())
Может быть, использование windows и триггеров - это неправильный подход полностью? Возможно, я должен сам использовать StateSpec
и буферизовать элементы. Я согласен с этим, но затем у меня есть десятки вопросов, что случится с моими буферизованными данными, если работа не удастся. Я предполагаю, что это будет уже подтверждено в PubSub.