Kafka Streams DSL (язык, специфичный для домена) построен поверх API Streams Processor.
Он использует низкоуровневые API-интерфейсы процессора с реализацией под ним для чтения сообщений из тем Кафки. Вот подробная архитектура:
https://kafka.apache.org/20/documentation/streams/architecture
Streams DSL построен поверх Processor API. Если вы глубоко погрузитесь в Processor API, вы увидите, как эти функции могут быть реализованы и могут быть легко вызваны с помощью одной строки кода:
https://kafka.apache.org/20/documentation/streams/developer-guide/processor-api.html
Так работают операции Stream DSL. При написании приложения KStream с использованием Streams DSL большинство операций можно вызывать в несколько строк кода, но под ним реализована полная реализация
Вот пример подсчета слов:
https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java
Первоначально каждая операция преобразуется в ProcessorNode . Таким образом, чтение из темы преобразуется в SourceNode , а запись в тему - SinkNode .
И все узлы добавляются в топологию последовательно.
Вы можете увидеть больше деталей в исходном коде для StreamsBuilder и StreamTask. Это даст вам представление о том, как построить и запустить топологию:
https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
Ниже приведен пример приложения KStream для Wordcount. Допустим, «wordcount-input» - это тема ввода, а «wordcount-output» - тема вывода:
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcount-lambda-example-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // add if you want to reset the offset to earliest for each run
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> textLines = builder.stream("wordcount-input");
final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
final KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
// Write the `KTable<String, Long>` to the output topic.
wordCounts.toStream().to("wordcount-output", Produced.with(stringSerde, longSerde));
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.cleanUp();
streams.start();