Я изучал Kafka Streams API и реализовал пример WordCountExample в Scala, а также в Java, но когда я запускаю код Scala, он дает очень медленный вывод по сравнению с примером Java.
Также я заметил, что когда я производил запись по теме wordCountInput , она мгновенно появляется в теме sinkWordCount , но результат wordCount не отображается мгновенно в wordCountOutput *Тема 1012 *, если мы запускаем код Scala , но если мы запускаем код Java , то все работает нормально, и я сразу вижу вывод в теме wordCountOutput .
Java-код:
public class WordCountExample {
public static void main(String[] args) {
Properties p = new Properties();
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.93:9092");
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("wordCountInput");
KTable<String, String> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\+W")))
.groupBy((key, word) -> word)
.count()
.mapValues((key, value) -> value.toString());
textLines.to("sinkWordCount");
wordCounts.toStream().to("wordCountOutput");
KafkaStreams streams = new KafkaStreams(builder.build(), p);
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
}));
}
}
POM Файл Java-программы
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mycompany</groupId>
<artifactId>KafkaStreamExample</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
</project>
Scala-код:
object KafkaStreamExample extends App {
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.93:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass)
p
}
val builder: StreamsBuilder = new StreamsBuilder()
val sourceStream: KStream[String, String] = builder.stream[String, String]("wordCountInput")
val wordCounts: KTable[String, String] = sourceStream
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
.groupBy((key, word) => word)
.count()
.mapValues((key, value) => value.toString)
wordCounts.toStream.to("wordCountOutput")
sourceStream.to("sinkWordCount")
val stream: KafkaStreams = new KafkaStreams(builder.build(), props)
sys.ShutdownHookThread {
stream.close()
}
stream.cleanUp()
stream.start()
}
Scala SBT:
name := "KafkaStreamExample"
version := "0.1"
scalaVersion := "2.12.6"
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.0.0"
libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" artifacts( Artifact("javax.ws.rs-api", "jar", "jar"))
Kafka Версия: kafka_2.11-2.0.0
так что любой может сказать мне, почему пример scala не выполняется так быстро, как пример java.
Обновление
Установка cache.max.bytes.buffering
этого свойства в 0 в коде Scalaдает мгновенный вывод WordCount в Scala, но это не рекомендуется использовать в производственной среде в соответствии с этим ответом Stackoverflow также в Java-коде нам не нужно устанавливать это свойство, но все же Java дает быстрый вывод