Задержка в обработке с Kafka Streams - PullRequest
0 голосов
/ 19 октября 2019

Я создал потоковую топологию, и у меня 1 Source и 2 Sink.

Я использую Spring Boot (2.1.9) с Kafka Streams и не использую Spring Cloud. Kafka Version 2.3.0

@Configuration
@EnableKafkaStreams
public class StreamStart {

@Bean
public KStream<String, String> process(StreamsBuilder builder){
KStream<String,String> inputStream = builder.stream("streamIn", Consumed.with(Serdes.String(),Serdes.String()));
         KStream<String,String> upperCaseStream = inputStream.mapValues(value->value.toUpperCase());

         upperCaseStream.to("outTopic", Produced.with(Serdes.String(),Serdes.String()));

         KTable<String, Long> wordCounts = upperCaseStream.flatMapValues(v-> Arrays.asList(v.split(" "))).
                 selectKey((k, v) -> v).groupByKey(Serialized.with(Serdes.String(),Serdes.String())).
                count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
         wordCounts.toStream().to("wordCountTopic", Produced.with(Serdes.String(),Serdes.Long()));

         return upperCaseStream;


}
}

Данные передаются в outTopic мгновенно, тогда как отображение данных в wordCountTopic занимает 20-25 секунд для каждой записи. Любые предложения ??

...