Пустые данные возвращаются при запросе с использованием падающего окна Кафки - PullRequest
1 голос
/ 26 мая 2019

Я пытаюсь запросить хранилище состояний, чтобы получить данные в окне за 5 минут.Для этого я использую tumbling window.Добавили REST для запроса данных.У меня есть stream A, который использует данные из topic1, выполняет некоторые преобразования и выводит значение ключа в topic2.Сейчас в stream B я делаю операцию переворачивания окна на topic2 данных.Когда я запускаю код и запрашиваю с помощью REST, я вижу пустые данные в моем браузере.Я вижу, как текут данные в государственном магазине.

Я наблюдал, вместо того чтобы topic2 получать данные из stream A, я использовал класс продюсера для ввода данных в topic2 и мог запрашивать данные из браузера.Но когда topic2 получает данные от stream A, я получаю пустые данные.

Вот мой stream A код:

public static void main(String[] args) {    

                final StreamsBuilder builder = new StreamsBuilder();
                KStream<String, String> source = builder.stream("topic1");
                KStream<String, String> output = source
                        .map((k,v)->
                        {                                                                       
                            Map<String, Object> Fields = new LinkedHashMap<>();

                            Fields.put("FNAME","ABC");
                            Fields.put("LNAME","XYZ");

                            Map<String, Object> nFields = new LinkedHashMap<>();
                            nFields.put("ADDRESS1","HY");
                            nFields.put("ADDRESS2","BA");               
                            nFields.put("addF",Fields);

                            Map<String, Object> eve = new LinkedHashMap<>();                            
                            eve.put("nFields", nFields);

                            Map<String, Object> fevent = new LinkedHashMap<>();
                            fevent.put("eve", eve);             
                            LinkedHashMap<String, Object> newMap = new LinkedHashMap<>(fevent);                                 

                            return new KeyValue<>("JAY1234",newMap.toString());  
                        });

                output.to("topic2");        

}

Вот мой stream B код (где происходит переворачиваемое окно):

public static void main(String[] args) {

    final StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> eventStream = builder.stream("topic2");

    eventStream.groupByKey()
        .windowedBy(TimeWindows.of(300000))         
        .reduce((v1, v2) -> v1 + ";" + v2, Materialized.as("TumblingWindowPoc"));

    final Topology topology = builder.build();      
    KafkaStreams streams = new KafkaStreams(topology, props);   
    streams.start();      
}

REST-код:

@GET()
    @Path("/{storeName}/{key}")
    @Produces(MediaType.APPLICATION_JSON)
    public List<KeyValue<String, String>> windowedByKey(@PathParam("storeName") final String storeName,
            @PathParam("key") final String key) {

        final ReadOnlyWindowStore<String, String> store = streams.store(storeName,
                QueryableStoreTypes.<String, String>windowStore());
        if (store == null) {
            throw new NotFoundException();      }

        long timeTo = System.currentTimeMillis(); 
        long timeFrom = timeTo - 30000;         
        final WindowStoreIterator<String> results = store.fetch(key, timeFrom, timeTo);

        final List<KeyValue<String,String>> windowResults = new ArrayList<>();
        while (results.hasNext()) {
            final KeyValue<Long, String> next = results.next();     
            windowResults.add(new KeyValue<String,String>(key + "@" + next.key, next.value));
        }
        return windowResults;
    }

И вот так выглядят мои данные значения ключа:

JAY1234 {eve = {nFields = {ADDRESS1 = HY,ADDRESS2 = BA,Fields = {FNAME = ABC,LNAME = XYZ,}}}}

Я смогу получить данные при запросе с помощью REST.Любая помощь очень ценится.Спасибо!

...