Я делаю проект и застрял на KTable.
Я хочу взять записи из темы и поместить их в KTable (хранилище), чтобы у меня была 1 запись на 1ключ.
static KafkaStreams streams;
final Serde<Long> longSerde = Serdes.Long();
final Serde<byte[]> byteSerde = Serdes.ByteArray();
static String topicName;
static String storeName;
final StreamsBuilder builder = new StreamsBuilder();
KStream<Long, byte[]> streamed = builder.stream(topicName, Consumed.with(longSerde, byteSerde));
KTable<Long, byte[]> records = streamed.groupByKey().reduce(
new Reducer<Long>() {
@Override
public Long apply(Long aggValue, Long newValue) {
return newValue;
}
},
storeName);
Это самый близкий мне ответ, я думаю.