writeStream () печатает нулевые значения в пакетных данных, даже если я предоставляю правильные json данные в kafka через writeStream () - PullRequest
0 голосов
/ 23 апреля 2020

Я пытаюсь преобразовать json, используя схему и печатая значения в консоль, но writeStream () печатает нулевые значения во всех столбцах, даже если я дал правильные данные.

данные, которые я передаю kafka topi c ..

{"stock":"SEE","buy":12,"sell":15,"profit":3,quantity:27,"loss":0,"gender":"M"}
{"stock":"SEE","buy":12,"sell":15,"profit":3,quantity:27,"loss":0,"gender":"M"}
{"stock":"SEE","buy":12,"sell":15,"profit":3,quantity:27,"loss":0,"gender":"M"}

Ниже мой scala код ..

 val readStreamDFInd = sparkSession.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "IndiaStocks")
  .option("startingOffsets", "earliest")
  .load()

//readStreamDFInd.printSchema()
val readStreamDFUS = sparkSession.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "USStocks")
  .option("startingOffsets", "earliest")
  .load()

val schema = new StructType()
  .add("stock", StringType)
  .add("buy", IntegerType)
  .add("sell", IntegerType)
  .add("profit", IntegerType)
  .add("quantity", IntegerType)
  .add("loss", IntegerType)
  .add("gender", StringType)

val stocksIndia = readStreamDFInd.selectExpr("CAST(value as STRING) as json").select(from_json($"json", schema).as("data")).select("data.*")
val stocksUSA = readStreamDFUS.selectExpr("CAST(value as STRING) as json").select(from_json($"json", schema).as("data")).select("data.*")
stocksIndia.printSchema() stocksUSA.writeStream
  .format("console")
  .outputMode("append").trigger(Trigger.ProcessingTime("5 seconds"))
  .start()
  .awaitTermination()

Below is the null values getting printed for all columns even though i provide proper data

Ответы [ 2 ]

1 голос
/ 23 апреля 2020

Код работает нормально, как вы можете видеть в книге .

При просмотре документации по функции from_json значения null создаются, потому что строка не разбирается .

=> Вам не хватает кавычек вокруг поля quantity в строке json.

0 голосов
/ 23 апреля 2020

Проблема в ваших данных кафки, столбец количество должен быть в кавычках. Пожалуйста, проверьте ниже.

{"акции": "СМОТРЕТЬ", "купить": 12, "продать": 15, "прибыль": 3, "количество" : 27 " убыток ": 0," пол ":" M "} {" акции ":" СМ. "," покупка ": 12," продажа ": 15," прибыль ": 3, " количество ": 27, "убыток": 0, "пол": "M"} {"акции": "СМ.", "Покупка": 12, "продажа": 15, "прибыль": 3, "количество" : 27, "потери": 0, "пол": "М"}

...