Я потребляю Kafka
данных и затем передаю данные в HDFS
.
Данные, хранящиеся в Kafka
теме trial
, выглядят так:
hadoop
hive
hive
kafka
hive
ОднакоКогда я отправляю свои коды, он возвращает:
Исключение в теме "main"
org.apache.spark.sql.streaming.StreamingQueryException: Text data source supports only a single column, and you have 7 columns.;
=== Streaming Query ===
Identifier: [id = 2f3c7433-f511-49e6-bdcf-4275b1f1229a, runId = 9c0f7a35-118a-469c-990f-af00f55d95fb]
Current Committed Offsets: {KafkaSource[Subscribe[trial]]: {"trial":{"2":13,"1":13,"3":12,"0":13}}}
Current Available Offsets: {KafkaSource[Subscribe[trial]]: {"trial":{"2":13,"1":13,"3":12,"0":14}}}
Мой вопрос: как показано выше, данные, хранящиеся в Kafka
, содержат только ОДИН столбецпочему программа говорит, что 7 columns
?
Любая помощь приветствуется.
Мои spark-streaming
коды:
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder.master("local[4]")
.appName("SpeedTester")
.config("spark.driver.memory", "3g")
.getOrCreate()
val ds = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.95.20:9092")
.option("subscribe", "trial")
.option("startingOffsets" , "earliest")
.load()
.writeStream
.format("text")
.option("path", "hdfs://192.168.95.21:8022/tmp/streaming/fixed")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
.awaitTermination()
}