Как использовать стандартную функцию from_json с пользовательской схемой (ошибка: перегружено значение метода from_json с альтернативой)? - PullRequest
0 голосов
/ 21 сентября 2019

Я использую данные JSON из потока AWS Kinesis, но я получаю следующую ошибку при попытке использовать стандартную функцию from_json():

command-5804948:32: error: overloaded method value from_json with alternatives:
  (e: org.apache.spark.sql.Column,schema: org.apache.spark.sql.Column)org.apache.spark.sql.Column <and>
  (e: org.apache.spark.sql.Column,schema: org.apache.spark.sql.types.DataType)org.apache.spark.sql.Column <and>
  (e: org.apache.spark.sql.Column,schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.Column
 cannot be applied to (String, org.apache.spark.sql.types.StructType)
    .select(from_json("jsonData", dataSchema).as("devices"))

Я попытался использовать оба приведенных ниже определения, чтобы определитьмоя схема:

val dataSchema = new StructType()
        .add("ID", StringType)
        .add("Type", StringType)
        .add("Body", StringType)
        .add("Submitted", StringType)

val dataSchema = StructType(Seq(StructField("ID",StringType,true), StructField("Type",StringType,true), StructField("Body",StringType,true), StructField("Submitted",StringType,true)))

Вот мой код:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
import java.nio.ByteBuffer
import scala.util.Random

val dataSchema = new StructType()
        .add("ID", StringType)
        .add("Type", StringType)
        .add("Body", StringType)
        .add("Submitted", StringType)

// val dataSchema = StructType(Seq(StructField("ID",StringType,true), StructField("Type",StringType,true), StructField("Body",StringType,true), StructField("Submitted",StringType,true)))

val kinesisDF = spark.readStream
    .format("kinesis")
    .option("streamName", "**************************")
    .option("region", "********")
    .option("initialPosition", "TRIM_HORIZON")
    .option("awsAccessKey", "****************")
    .option("awsSecretKey", "************************************")
    .load()

val schemaDF = kinesisDF
    .selectExpr("cast (data as STRING) jsonData")
    .select(from_json("jsonData", dataSchema).as("devices"))
    .select("devices.*")
    .load()

display(schemaDF)

Если вы сделаете следующее:

val str_data = kinesisDF
    .selectExpr("cast (data as STRING) jsonData")

display(str_data)

, вы увидите, что данные потока выглядят как:

{"ID": "1266ee3d99bc-96f942a6-434c-6442-a762", "Type": "BT", "Body": "{\" TN \ ": \" ND \ ",\ "ТД \": \ "JSON: {\\" \\ оч.сл. ": \\" CV \\ "} \", \ "LT \": \ "BT \", \ "TI \": \»9ff2-4749250dd142-793ffb20-eb8e-47f7 \ "\ "CN \": \ "ОД \", \ "ДИ \": \ "ЕВ \", \ "UI \": \ "abc004 \", \"\ ": \" 1234567 \», \ "TT \": \ "2019-09-15T09: 48: 25.0395209Z \", \ "FI \": \ "N / A \", \ "HI \": \ "N / A \", \ "SV \": 6} "," Submitted ":" 2019-09-15 09: 48: 26.079 "}

{" ID ":" c8eb956ee98c-68d668b7-e7a6-9ea2-49a5" , "Тип": "МС", "Тело": "{\" МТ \ ": \" N / A \», \ "ЕР \": \ "N / A \"\ "RQ \": \ "{\\" IA] \\ ": ложь, \\" AN \\ ": нуль, \\" ACI \\ "\\" 1266ee3d99bc-96f942a6-434c-6442-a762 \\ "\\ "CI \\" \\ "отлив \\", \\ "CG \\" \\" 8b8a-4ab17555f2fa-da0c8047-b5a6-4ebe \\», \\ "UI \\" \\ "def211 \\", \\ "UR \\" \\ "\\ ЕШКО", \\ "\\ UL": \\ "\\ SCC"\\ "TI \\" \\ "b9d2-d4f646a15d66-dc519f4a-48c3-4e7b \\", \\ "TN \\": нуль, \\ "\\ MN": нуль, \\ "ЧТЗ \\ ": нуль, \\" \\ PM ": нуль, \\" \\ TS ": нуль, \\" CI \\ "\\" EBC \\», \\ "ALDC \\": нулевая} "," Submitted ":" 2019-09-15 09: 49: 46.901 "}

Значением ключа" Body "является другой JSON / вложенный JSON, поэтому я поместил его как StringTypeв схеме, так что хранится в столбце как есть.Я получаю следующую ошибку при запуске кода выше:

Как это исправить?

1 Ответ

2 голосов
/ 21 сентября 2019

Эта часть ошибки говорит само за себя:

не может применяться к (String, org.apache.spark.sql.types.StructType)

Этоозначает, что есть три различных варианта стандартной функции from_json, и все они ожидают, что объект Column не String.

Вы можете просто исправить это, используя синтаксис $ (или используяcol стандартная функция) следующим образом:

.select(from_json($"jsonData", dataSchema).as("devices"))

Обратите внимание на $ перед именем столбца, которое (неявно) превращает его в Column объект.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...