Я создал потоковую топологию, и у меня 1 Source и 2 Sink.
Я использую Spring Boot (2.1.9) с Kafka Streams и не использую Spring Cloud. Kafka Version 2.3.0
@Configuration
@EnableKafkaStreams
public class StreamStart {
@Bean
public KStream<String, String> process(StreamsBuilder builder){
KStream<String,String> inputStream = builder.stream("streamIn", Consumed.with(Serdes.String(),Serdes.String()));
KStream<String,String> upperCaseStream = inputStream.mapValues(value->value.toUpperCase());
upperCaseStream.to("outTopic", Produced.with(Serdes.String(),Serdes.String()));
KTable<String, Long> wordCounts = upperCaseStream.flatMapValues(v-> Arrays.asList(v.split(" "))).
selectKey((k, v) -> v).groupByKey(Serialized.with(Serdes.String(),Serdes.String())).
count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
wordCounts.toStream().to("wordCountTopic", Produced.with(Serdes.String(),Serdes.Long()));
return upperCaseStream;
}
}
Данные передаются в outTopic мгновенно, тогда как отображение данных в wordCountTopic занимает 20-25 секунд для каждой записи. Любые предложения ??