У меня есть приложение для потокового Python со структурой pyspark, настроенное так:
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("data streaming app")\
.getOrCreate()
data_raw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafkahost:9092")\
.option("subscribe", "my_topic")\
.load()
query = data_raw.writeStream\
.outputMode("append")\
.format("console")\
.option("truncate", "false")\
.trigger(processingTime="5 seconds")\
.start()\
.awaitTermination()
И все, что появляется, это
+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+
19/03/04 22:00:50 INFO streaming.StreamExecution: Streaming query made progress: {
"id" : "ab24bd30-6e2d-4c2a-92a2-ddad66906a5b",
"runId" : "29592d76-892c-4b29-bcda-f4ef02aa1390",
"name" : null,
"timestamp" : "2019-03-04T22:00:49.389Z",
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 852,
"getBatch" : 180,
"getOffset" : 135,
"queryPlanning" : 107,
"triggerExecution" : 1321,
"walCommit" : 27
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[my_topic]]",
"startOffset" : null,
"endOffset" : {
"my_topic" : {
"0" : 303
}
},
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@74fad4a5"
}
}
Как видите, my_topic
имеетТам 303 сообщения, но я не могу их показать.Дополнительная информация включает в себя то, что я использую сливной коннектор Kafka JDBC для запроса базы данных оракула и сохранения строк в теме кафки.У меня есть настройки реестра Авро схема с этим.При необходимости я также поделюсь этими файлами свойств.
Кто-нибудь знает, что происходит?