Как проанализировать ошибку несоответствия типов с помощью набора данных Spark и UDF - PullRequest
0 голосов
/ 12 января 2019

Я работаю над двумя CSV-файлами для объединения данных и создания JSON Payload с использованием библиотеки json4s. Я сталкиваюсь с проблемой в отображении строки набора искровых данных с UDF.

Я пытался создать простой UDF, принимающий строку и возвращающий жестко закодированные значения. Вопрос остается прежним.

val station_data = spark.read.format("csv").option("sep", ",").option("inferSchema", "false").option("header", "true").load("gs://loyds-assignment/station_data.csv").drop("lat").drop("long").drop("dockcount").drop("installation")
val trip_data = spark.read.format("csv").option("sep", ",").option("inferSchema", "false").option("header", "true").load("gs://loyds-assignment/trip_data.csv").drop("Start Date").drop("End Date").drop("Subscriber Type").drop("Zip Code")

val getConcatenated = udf((first: String, second: String) => {
        first + "," + second
      })

val StatStationData = trip_data.join(station_data, col("Start Terminal") === col("station_id"), "inner").withColumn("Start Station", col("name")).withColumn("StartStationlandmark", col("landmark")).drop("name").drop("Start Terminal").drop("station_id").drop("landmark")
val FinalData = StatStationData.join(station_data, col("End Terminal") === col("station_id"), "inner").withColumn("End Station", col("name")).withColumn("Final landmark", when(col("landmark") === col("StartStationlandmark"), col("landmark")).otherwise(getConcatenated($"landmark", $"StartStationlandmark"))).drop("name").drop(("End Terminal")).drop("station_id").drop("landmark").drop("StartStationlandmark")

val FinalDataDf = FinalData.withColumn("TripID", col("Trip ID")).withColumn("EndStation", col("End Station")).withColumn("landmark", split(col("Final landmark"), "\\,")).withColumn("Bike", col("Bike #")).withColumn("StartStation", col("Start Station")).drop("Trip ID").drop("End Station").drop("Final landmark").drop("Bike #").drop("Start Station")

FinalDataDf.show(false)

case class FinalDataStruct(TripID: String, Duration: String, Bike: String, StartStation: String, EndStation: String, landmark: String)

val encoder = org.apache.spark.sql.Encoders.product[FinalDataStruct]

val FinalDataDS = FinalDataDf.as(encoder)

FinalDataDS.show(false)

import spark.sqlContext.implicits._
import org.apache.spark.sql._

import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

def convertRowToJSON(row: Row) = {
  val json =
    ("bike" -> row(3).toString) ~
      ("start_station" -> row(4).toString) ~
      ("end_station" -> row(5).toString) ~
      ("landmarks" -> row(6).toString) ~
      ("total_duration" -> row(2).toString)
  (row(1).toString, compact(render(json)).toString)
}

val JsonPlayloadData = FinalDataDS.map(convertRowToJSON)

// To Test
def convertRowToJSONTtry(row: Row) = {
  (11, "Hello".toString)
}
val JsonPlayloadDataTest1 = FinalDataDS.map(convertRowToJSONTtry)

Я получаю ошибку:

scala> val JsonPlayloadData = FinalDataDS.map(convertRowToJSON)
<console>:42: error: type mismatch;
 found   : org.apache.spark.sql.Row => (String, String)
 required: FinalDataStruct => ?
       val JsonPlayloadData = FinalDataDS.map(convertRowToJSON)

1 Ответ

0 голосов
/ 12 января 2019

Сообщение об ошибке говорит вам почти все, что нужно знать здесь. Вы определили функцию Row => (String, String), в то время как вы отображаете на Dataset[FinalDataStruct] (это не udf) и вам нужно FinalDataStruct => ?.

Если вы хотите использовать это, примените его к DataFrame:

FinalDataDf.map(convertRowToJSON)

Вкл. Dataset[FinalDataStruct] Использование:

import org.json4s._

import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.write

FinalDataDS.map { x =>   
  implicit val formats = DefaultFormats
  (x.TripID, write(x))
}

Хотя на практике было бы предпочтительнее заменить карту на to_json call - Spark Row на JSON .

Дополнительно обратите внимание, что Rows проиндексированы от 0, а не от 1.

...