Spark Structured Streaming записал результаты в kafka, но я не смог найти данные в теме Kafka, используя данные о потреблении консоли kafka.
версия: kafka_2.11-0.10.2.0 spark-2.4.1-bin-without-hadoop scala-2.11.12
object kafkatest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("test").getOrCreate()
val lines = spark.readStream.format("socket")
.option("host","localhost").option("port",9999).load()
import spark.implicits._
val words = lines.as[String].flatMap(_.split(" "))
val wordcounts = words.groupBy("value").count()
val query = wordcounts
.select(wordcounts("value").as("value")
.writeStream
.outputMode("complete").format("kafka")
.option("kafka.bootstrap.servers","192.168.234.10:9092")
.option("topic","hello")
.start()
query.awaitTermination()
Затем я использую консоль Kafka для приема сообщений:
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic hello
Но я не смог получить никаких сообщений на консоли Kafka,Структурированное потоковое вещание сработало и вычислило результаты.
Я обнаружил, что журнал похож на это.
19/04/25 09:14:50 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "49806357-3bb4-4cc8-b726-933118e4f9f1",
"runId" : "7ff3b925-a1f0-47f0-b5eb-aaed96910cd4",
"name" : null,
"timestamp" : "2019-04-25T01:14:50.009Z",
"batchId" : 1,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getEndOffset" : 0,
"setOffsetRange" : 3,
"triggerExecution" : 5
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[hello]]",
"startOffset" : {
"hello" : {
"0" : 8
}
},
"endOffset" : {
"hello" : {
"0" : 8
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@318cfc08"
}
}
Почему сток не является кафкой?Но я уже настроил его.
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@318cfc08"
}
Я отправил приложение spark с консолью spark-submit