Я использую два разных срока хранения для двух разных KTable, и он работает с состояниями RocksDB и темами изменений Kafka.
KTable генерируется из KStream и groupBy
, а затем windowedBy
.
Я считаю, что при объединении KStream
с оконным управлением TimeWindows
то же самое.Мне интересно, будет ли польза или недостаток, если параметры TimeWindows
будут разными, при соединении двух разных таблиц KTable с помощью TimeWindows
?
фрагмента кода:
final KStream<Integer, String> eventStream = builder.stream("events",
Consumed.with(Serdes.Integer(), Serdes.String())
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST));
final KTable<Windowed<Integer>, String> eventWindowTable = eventStream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(60)).until(Duration.ofSeconds(100).toMillis()))
.reduce((oldValue, newValue) -> newValue);
final KStream<Integer, String> clickStream = builder.stream("clicks",
Consumed.with(Serdes.Integer(), Serdes.String())
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST));
final KTable<Windowed<Integer>, String> clickWindowTable = clickStream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).until(Duration.ofSeconds(70).toMillis()))
.reduce((oldValue, newValue) -> newValue);
final KTable<Windowed<Integer>, String> join = eventWindowTable.leftJoin(clickWindowTable,
(event, click) -> event + " ; " + click + " ; " + Instant.now()
);
Сначала я думалобъединение двух разных KTable с разными параметрами TimeWindows
не будет работать, потому что объединение основано на TimeWindowedKey, ключе для временного интервала.Но после тестирования это тоже работает.