Низкая производительность API Kafka Stream Scala - PullRequest
0 голосов
/ 28 октября 2018

Я изучал Kafka Streams API и реализовал пример WordCountExample в Scala, а также в Java, но когда я запускаю код Scala, он дает очень медленный вывод по сравнению с примером Java.

Также я заметил, что когда я производил запись по теме wordCountInput , она мгновенно появляется в теме sinkWordCount , но результат wordCount не отображается мгновенно в wordCountOutput *Тема 1012 *, если мы запускаем код Scala , но если мы запускаем код Java , то все работает нормально, и я сразу вижу вывод в теме wordCountOutput .

Java-код:

public class WordCountExample {

        public static void main(String[] args) {
            Properties p = new Properties();
            p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
            p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.93:9092");
            p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

            StreamsBuilder builder = new StreamsBuilder();
            KStream<String, String> textLines = builder.stream("wordCountInput");
            KTable<String, String> wordCounts = textLines
                    .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\+W")))
                    .groupBy((key, word) -> word)
                    .count()
                    .mapValues((key, value) -> value.toString());

            textLines.to("sinkWordCount");

            wordCounts.toStream().to("wordCountOutput");

            KafkaStreams streams = new KafkaStreams(builder.build(), p);
            streams.cleanUp();
            streams.start();

            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                streams.close();
            }));
        }
}

POM Файл Java-программы

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.mycompany</groupId>
    <artifactId>KafkaStreamExample</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
</project>

Scala-код:

object KafkaStreamExample extends App {

  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.93:9092")
    p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
    p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass)
    p
  }
  val builder: StreamsBuilder = new StreamsBuilder()
  val sourceStream: KStream[String, String] = builder.stream[String, String]("wordCountInput")

  val wordCounts: KTable[String, String] = sourceStream
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
    .groupBy((key, word) => word)
    .count()
    .mapValues((key, value) => value.toString)

  wordCounts.toStream.to("wordCountOutput")

  sourceStream.to("sinkWordCount")

  val stream: KafkaStreams = new KafkaStreams(builder.build(), props)

  sys.ShutdownHookThread {
    stream.close()
  }

  stream.cleanUp()
  stream.start()
}

Scala SBT:

name := "KafkaStreamExample"

version := "0.1"

scalaVersion := "2.12.6"

libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % "2.0.0"
libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" artifacts( Artifact("javax.ws.rs-api", "jar", "jar"))

Kafka Версия: kafka_2.11-2.0.0

так что любой может сказать мне, почему пример scala не выполняется так быстро, как пример java.

Обновление

Установка cache.max.bytes.buffering этого свойства в 0 в коде Scalaдает мгновенный вывод WordCount в Scala, но это не рекомендуется использовать в производственной среде в соответствии с этим ответом Stackoverflow также в Java-коде нам не нужно устанавливать это свойство, но все же Java дает быстрый вывод

...