Я пытаюсь преобразовать json, используя схему и печатая значения в консоль, но writeStream () печатает нулевые значения во всех столбцах, даже если я дал правильные данные.
данные, которые я передаю kafka topi c ..
{"stock":"SEE","buy":12,"sell":15,"profit":3,quantity:27,"loss":0,"gender":"M"}
{"stock":"SEE","buy":12,"sell":15,"profit":3,quantity:27,"loss":0,"gender":"M"}
{"stock":"SEE","buy":12,"sell":15,"profit":3,quantity:27,"loss":0,"gender":"M"}
Ниже мой scala код ..
val readStreamDFInd = sparkSession.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "IndiaStocks")
.option("startingOffsets", "earliest")
.load()
//readStreamDFInd.printSchema()
val readStreamDFUS = sparkSession.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "USStocks")
.option("startingOffsets", "earliest")
.load()
val schema = new StructType()
.add("stock", StringType)
.add("buy", IntegerType)
.add("sell", IntegerType)
.add("profit", IntegerType)
.add("quantity", IntegerType)
.add("loss", IntegerType)
.add("gender", StringType)
val stocksIndia = readStreamDFInd.selectExpr("CAST(value as STRING) as json").select(from_json($"json", schema).as("data")).select("data.*")
val stocksUSA = readStreamDFUS.selectExpr("CAST(value as STRING) as json").select(from_json($"json", schema).as("data")).select("data.*")
stocksIndia.printSchema() stocksUSA.writeStream
.format("console")
.outputMode("append").trigger(Trigger.ProcessingTime("5 seconds"))
.start()
.awaitTermination()