Я использую Kafka DSL.Как бы я продолжил подавлять вывод агрегации (поведение, подобное this ) с пользовательским условием?
Скажем, для каждого ключа у меня могут быть события START и STOP.Я хочу агрегировать этот ключ только при наступлении события STOP или по истечении времени ожидания.
Требуемый поток будет примерно таким:
time input-topic output-topic
1 key1:{type:start, time: 0} ...
3 key2:{type:start, time: 2} ...
4 key1:{type:stop, time:3} ...
4+e ... key1:{type:closed, duration:3}
61 ... ...
61+e ... key2:{type:timeout, duration:60}
, где время ожидания составляет 60 единиц времени.и e - произвольное время, необходимое потоку для обработки события.
Код (пока псевдокод) будет выглядеть примерно так:
KStream<String,String> sourceStream = builder.stream("input-topic", Consumed.with(stringSerializer, stringSerializer));
KGroupedStream<String, String> groupedStream = sourceStream
.groupByKey();
KTable<String, String> aggregatedStream = groupedStream
.suppress(Suppressed.untilWindowCloses(myCustomCondition()))
.aggregate(
() -> null,
(aggKey, newValue, aggValue) -> aggregateStartStop(aggValue, newValue),
Materialized
.<String, String, KeyValueStore<Bytes, byte[]>>as("aggregated-stream-store")
.withValueSerde(Serdes.String())
);
aggregatedStream.toStream();
KafkaStreams streams = new KafkaStreams(builder.build(), streamsSettings);
streams.start();