Невозможно выполнить запрос в локальном хранилище ключей-значений (Kafka-Stream) - PullRequest
0 голосов
/ 18 октября 2019

Я работаю над сценарием использования, где мне нужно запросить KTable (с использованием подхода локальных хранилищ значений ключей). Мои образцы данных, которые присутствуют в теме:

A,Blue
A,Blue 
A,Yellow
A,Red
A,Yellow
A,Yellow
B,Blue
C,Red
C,Red
B,Blue

На основе ввода Iхочу сгенерировать вывод и сохранить в теме:

A Blue:2,Yellow:3,Red:1
B Blue:2
C Red:2

Подход: 1) Я сначала выполнил операцию подсчета, прочитав данные темы в Kstream.

     //set the properties for interactive queries
     props.put(StreamsConfig.APPLICATION_SERVER_CONFIG,"localhost:9092" );
        props.put(StreamsConfig.STATE_DIR_CONFIG, "D:\\Kafka_data\\Local_store");

 //read the user input from Kafka topic: data
        final KStream<String,String> userDataSource = builder.stream("data");

        final KGroupedStream<String,String> inputData = userDataSource.
                map((key, value) -> new KeyValue<>(value.split(",")[0].toString() +  "_"+ value.split(",")[1].toString() , value.split(",")[1].toString()) )
                .selectKey((s, s2) -> s)
                .groupByKey(Grouped.with(Serdes.String(),Serdes.String()));


        final KTable<String,Long> inputAggregationResult  = inputData.count();

Результат приведенного выше кода:

A_Blue 1
A_Yellow 1
A_Red 1
A_Yellow 2
A_Yellow 3
B_Blue 1
C_Red 1
C_Red 2
B_Blue 2

2) Затем сохраните результат в теме:

inputAggregationResult.toStream().to("input-data-aggregation", Produced.with(Serdes.String(), Serdes.Long()));

3) Теперь считывает данные из темы (входные данные-агрегация) как Ktable, чтобы я мог запросить.

final StreamsBuilder builder = new StreamsBuilder();

KTable<String, Object> ktableInformation = builder.table("input-data-aggregation", Materialized.<String, Object, KeyValueStore<Bytes, byte[]>>as("CountsValueStore"));

 final KafkaStreams streams = new KafkaStreams(builder.build(), props);

streams.cleanUp();
 streams.start();

 ReadOnlyKeyValueStore<String, Object> keyValueStore;
        Map<String,Object> information = new LinkedHashMap<String,Object>();
        while (true) {
            try {

                // Get the key-value store CountsKeyValueStore
                 keyValueStore =
                        streams.store(ktableInformation.queryableStoreName(), QueryableStoreTypes.keyValueStore());


                //read the value
                KeyValueIterator<String, Object> range = keyValueStore.range("all", "streams");
                while (range.hasNext()) {
                    KeyValue<String, Object> next = range.next();
                    information.put(next.key,next.value);
                    System.out.println("count for " + next.key + ": " + next.value);
                }
                // close the iterator to release resources
                range.close();


            } catch (InvalidStateStoreException ignored) {
                ignored.printStackTrace();


            }
        }

4) Когда я пытаюсь запросить данные, он выдает пустые данные (не выводятся данные для печати).

Может ли кто-нибудь мне помочь, если я пропустил какой-либо шаг как часть запросаместные магазины значения ключа? или любая другая альтернатива для достижения целевого результата. Я подтвердил, что Kafka записывает данные хранилища локальных ключей в мой локальный экземпляр, но при чтении (запросе) данных он дает пустой результат.

...