У меня есть тема, в которую поступает поток данных.Мне нужно создать отдельную тему из этой темы, которая имеет только последний набор значений с учетом ключей.
Я думал, что вся цель KTable заключается в том, что он будет хранить последнее значение с заданным ключом, а не сохранятьвесь поток событий.Однако я не могу заставить это работать.Выполнение приведенного ниже кода создает хранилище ключей, но в этом хранилище ключей (maintopiclatest) есть поток событий (а не только последние значения).Поэтому, если я отправляю запрос с 1000 записями в теме дважды, а не вижу 1000 записей, я вижу 2000 записей.
var serializer = new KafkaSpecificRecordSerializer();
var deserializer = new KafkaSpecificRecordDeserializer();
var stream = kStreamBuilder.stream("maintopic",
Consumed.with(Serdes.String(), Serdes.serdeFrom(serializer, deserializer)));
var table = stream
.groupByKey()
.reduce((aggV, newV) -> newV, Materialized.as("maintopiclatest"));
Другая проблема заключается в том, что если я хочу сохранить KTable в новой теме, яЯ не уверен, как это сделать.Чтобы сделать это, мне кажется, что я должен превратить его обратно в поток, чтобы я мог вызвать «.to» на нем.Но тогда в нем есть весь поток событий, а не только последние значения.