Я работаю над сценарием использования, где мне нужно запросить KTable (с использованием подхода локальных хранилищ значений ключей). Мои образцы данных, которые присутствуют в теме:
A,Blue
A,Blue
A,Yellow
A,Red
A,Yellow
A,Yellow
B,Blue
C,Red
C,Red
B,Blue
На основе ввода Iхочу сгенерировать вывод и сохранить в теме:
A Blue:2,Yellow:3,Red:1
B Blue:2
C Red:2
Подход: 1) Я сначала выполнил операцию подсчета, прочитав данные темы в Kstream.
//set the properties for interactive queries
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG,"localhost:9092" );
props.put(StreamsConfig.STATE_DIR_CONFIG, "D:\\Kafka_data\\Local_store");
//read the user input from Kafka topic: data
final KStream<String,String> userDataSource = builder.stream("data");
final KGroupedStream<String,String> inputData = userDataSource.
map((key, value) -> new KeyValue<>(value.split(",")[0].toString() + "_"+ value.split(",")[1].toString() , value.split(",")[1].toString()) )
.selectKey((s, s2) -> s)
.groupByKey(Grouped.with(Serdes.String(),Serdes.String()));
final KTable<String,Long> inputAggregationResult = inputData.count();
Результат приведенного выше кода:
A_Blue 1
A_Yellow 1
A_Red 1
A_Yellow 2
A_Yellow 3
B_Blue 1
C_Red 1
C_Red 2
B_Blue 2
2) Затем сохраните результат в теме:
inputAggregationResult.toStream().to("input-data-aggregation", Produced.with(Serdes.String(), Serdes.Long()));
3) Теперь считывает данные из темы (входные данные-агрегация) как Ktable, чтобы я мог запросить.
final StreamsBuilder builder = new StreamsBuilder();
KTable<String, Object> ktableInformation = builder.table("input-data-aggregation", Materialized.<String, Object, KeyValueStore<Bytes, byte[]>>as("CountsValueStore"));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.cleanUp();
streams.start();
ReadOnlyKeyValueStore<String, Object> keyValueStore;
Map<String,Object> information = new LinkedHashMap<String,Object>();
while (true) {
try {
// Get the key-value store CountsKeyValueStore
keyValueStore =
streams.store(ktableInformation.queryableStoreName(), QueryableStoreTypes.keyValueStore());
//read the value
KeyValueIterator<String, Object> range = keyValueStore.range("all", "streams");
while (range.hasNext()) {
KeyValue<String, Object> next = range.next();
information.put(next.key,next.value);
System.out.println("count for " + next.key + ": " + next.value);
}
// close the iterator to release resources
range.close();
} catch (InvalidStateStoreException ignored) {
ignored.printStackTrace();
}
}
4) Когда я пытаюсь запросить данные, он выдает пустые данные (не выводятся данные для печати).
Может ли кто-нибудь мне помочь, если я пропустил какой-либо шаг как часть запросаместные магазины значения ключа? или любая другая альтернатива для достижения целевого результата. Я подтвердил, что Kafka записывает данные хранилища локальных ключей в мой локальный экземпляр, но при чтении (запросе) данных он дает пустой результат.