Kafka не может получать сообщения от структурированной потоковой передачи - PullRequest
0 голосов
/ 24 апреля 2019

Spark Structured Streaming записал результаты в kafka, но я не смог найти данные в теме Kafka, используя данные о потреблении консоли kafka.

версия: kafka_2.11-0.10.2.0 spark-2.4.1-bin-without-hadoop scala-2.11.12

object kafkatest {


  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("test").getOrCreate()
    val lines = spark.readStream.format("socket")
      .option("host","localhost").option("port",9999).load()
    import spark.implicits._
    val words = lines.as[String].flatMap(_.split(" "))
    val wordcounts = words.groupBy("value").count()
    val query = wordcounts
    .select(wordcounts("value").as("value")
    .writeStream
    .outputMode("complete").format("kafka")
    .option("kafka.bootstrap.servers","192.168.234.10:9092")
    .option("topic","hello")
    .start()
    query.awaitTermination()

Затем я использую консоль Kafka для приема сообщений:

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic hello

Но я не смог получить никаких сообщений на консоли Kafka,Структурированное потоковое вещание сработало и вычислило результаты.

Я обнаружил, что журнал похож на это.

19/04/25 09:14:50 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "49806357-3bb4-4cc8-b726-933118e4f9f1",
  "runId" : "7ff3b925-a1f0-47f0-b5eb-aaed96910cd4",
  "name" : null,
  "timestamp" : "2019-04-25T01:14:50.009Z",
  "batchId" : 1,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getEndOffset" : 0,
    "setOffsetRange" : 3,
    "triggerExecution" : 5
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[hello]]",
    "startOffset" : {
      "hello" : {
        "0" : 8
      }
    },
    "endOffset" : {
      "hello" : {
        "0" : 8
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@318cfc08"
  }
}

Почему сток не является кафкой?Но я уже настроил его.

 "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@318cfc08"
  }

Я отправил приложение spark с консолью spark-submit

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...