Я хочу объединить два потока topi c (левое соединение) и выполнить агрегирование на основе окон по объединенному потоку. Однако при объединении некоторые сообщения считаются дважды, так как при соединении некоторые сообщения отправляются дважды, в зависимости от задержки в правом верхнем углу c. Ниже приведен код для заказа на поставку C.
StreamsBuilder builder = new StreamsBuilder();
KStream<Long, BidMessage> bidStream = builder.stream("bid", Consumed.with(new LongSerde(), new BidMessageSerde()).withTimestampExtractor(new BidMessageTimestampExtractor()));
KStream<Long, ClickMessage> clickStream = builder.stream("click", Consumed.with(new LongSerde(), new ClickMessageSerde()).withTimestampExtractor(new ClickMessageTimestampExtractor()));
KStream<String, BidMessage> newBidStream = bidStream.selectKey((key, value) -> value.getRequestId());
KStream<String, CLickMessage> newClickStream = impStream.selectKey((key, value) -> value.getRequestId());
KStream<String, BidMergedMessage> result = newBidStream.leftJoin(newImpStream,
getValueJoiner(),
JoinWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(0)),
Joined.with(Serdes.String(), new BidMessageSerde(), new ClickMessageSerde()));
result.groupBy((key, value) -> "" + value.getClientId(), Grouped.with(Serdes.String(), newBidMergedSerde()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(40)))
.aggregate(() -> new AggResult(0, 0), (key, value, aggregate) -> {
if (value.getClickId() != null) {
aggregate.clicks_++;
}
aggregate.bids_++;
return aggregate;
}, Materialized.with(Serdes.String(),new AggResultJsonSerde()))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach((key, value) -> {
logger.info("{}-{}, clientId : {}, Value: {}", new Date(key.window().start()), new Date(key.window().end()),key.key(), value);
});
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Можно ли исправить это, чтобы избежать дубликатов из-за объединения?