ниже приведен фрагмент кода,
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "brokerIP:port");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
StreamBuilder builder = new StreamBuilder();
KStream streamData = builder.stream(inputTopicName);
streamData.groupByKey(Grouped.with(jsonSerde,jsonSerde))
.aggregate( //some transformation );
KafkaStreams kafkaStreams = new KafkaStreams(
builder.build(streamConfiguration),
streamConfiguration
);
здесь мы не используем окно сеанса, и этот фрагмент дает мне идеальный результат. Но когда я представляю окно сеанса с этим потоком, тогда оно дает исключение указателя null
для агрегатной функции.
здесь кто-нибудь может помочь