Я использую данные JSON из потока AWS Kinesis, но я получаю следующую ошибку при попытке использовать стандартную функцию from_json()
:
command-5804948:32: error: overloaded method value from_json with alternatives:
(e: org.apache.spark.sql.Column,schema: org.apache.spark.sql.Column)org.apache.spark.sql.Column <and>
(e: org.apache.spark.sql.Column,schema: org.apache.spark.sql.types.DataType)org.apache.spark.sql.Column <and>
(e: org.apache.spark.sql.Column,schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.Column
cannot be applied to (String, org.apache.spark.sql.types.StructType)
.select(from_json("jsonData", dataSchema).as("devices"))
Я попытался использовать оба приведенных ниже определения, чтобы определитьмоя схема:
val dataSchema = new StructType()
.add("ID", StringType)
.add("Type", StringType)
.add("Body", StringType)
.add("Submitted", StringType)
val dataSchema = StructType(Seq(StructField("ID",StringType,true), StructField("Type",StringType,true), StructField("Body",StringType,true), StructField("Submitted",StringType,true)))
Вот мой код:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
import java.nio.ByteBuffer
import scala.util.Random
val dataSchema = new StructType()
.add("ID", StringType)
.add("Type", StringType)
.add("Body", StringType)
.add("Submitted", StringType)
// val dataSchema = StructType(Seq(StructField("ID",StringType,true), StructField("Type",StringType,true), StructField("Body",StringType,true), StructField("Submitted",StringType,true)))
val kinesisDF = spark.readStream
.format("kinesis")
.option("streamName", "**************************")
.option("region", "********")
.option("initialPosition", "TRIM_HORIZON")
.option("awsAccessKey", "****************")
.option("awsSecretKey", "************************************")
.load()
val schemaDF = kinesisDF
.selectExpr("cast (data as STRING) jsonData")
.select(from_json("jsonData", dataSchema).as("devices"))
.select("devices.*")
.load()
display(schemaDF)
Если вы сделаете следующее:
val str_data = kinesisDF
.selectExpr("cast (data as STRING) jsonData")
display(str_data)
, вы увидите, что данные потока выглядят как:
{"ID": "1266ee3d99bc-96f942a6-434c-6442-a762", "Type": "BT", "Body": "{\" TN \ ": \" ND \ ",\ "ТД \": \ "JSON: {\\" \\ оч.сл. ": \\" CV \\ "} \", \ "LT \": \ "BT \", \ "TI \": \»9ff2-4749250dd142-793ffb20-eb8e-47f7 \ "\ "CN \": \ "ОД \", \ "ДИ \": \ "ЕВ \", \ "UI \": \ "abc004 \", \"\ ": \" 1234567 \», \ "TT \": \ "2019-09-15T09: 48: 25.0395209Z \", \ "FI \": \ "N / A \", \ "HI \": \ "N / A \", \ "SV \": 6} "," Submitted ":" 2019-09-15 09: 48: 26.079 "}
{" ID ":" c8eb956ee98c-68d668b7-e7a6-9ea2-49a5" , "Тип": "МС", "Тело": "{\" МТ \ ": \" N / A \», \ "ЕР \": \ "N / A \"\ "RQ \": \ "{\\" IA] \\ ": ложь, \\" AN \\ ": нуль, \\" ACI \\ "\\" 1266ee3d99bc-96f942a6-434c-6442-a762 \\ "\\ "CI \\" \\ "отлив \\", \\ "CG \\" \\" 8b8a-4ab17555f2fa-da0c8047-b5a6-4ebe \\», \\ "UI \\" \\ "def211 \\", \\ "UR \\" \\ "\\ ЕШКО", \\ "\\ UL": \\ "\\ SCC"\\ "TI \\" \\ "b9d2-d4f646a15d66-dc519f4a-48c3-4e7b \\", \\ "TN \\": нуль, \\ "\\ MN": нуль, \\ "ЧТЗ \\ ": нуль, \\" \\ PM ": нуль, \\" \\ TS ": нуль, \\" CI \\ "\\" EBC \\», \\ "ALDC \\": нулевая} "," Submitted ":" 2019-09-15 09: 49: 46.901 "}
Значением ключа" Body "является другой JSON / вложенный JSON, поэтому я поместил его как StringTypeв схеме, так что хранится в столбце как есть.Я получаю следующую ошибку при запуске кода выше:
Как это исправить?