исключение nullpointer в агрегатной операции в потоке kafka - PullRequest
0 голосов
/ 04 мая 2020

ниже приведен фрагмент кода,

streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test"); 
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerIP:port"); 
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);

StreamBuilder builder = new StreamBuilder(); 

KStream streamData = builder.stream(inputTopicName);

streamData.groupByKey(Grouped.with(jsonSerde,jsonSerde)) 
    .aggregate( //some transformation );

KafkaStreams kafkaStreams = new KafkaStreams(
    builder.build(streamConfiguration), 
    streamConfiguration
);

здесь мы не используем окно сеанса, и этот фрагмент дает мне идеальный результат. Но когда я представляю окно сеанса с этим потоком, тогда оно дает исключение указателя null для агрегатной функции.

здесь кто-нибудь может помочь

...