Как передать dataframe в спд udf? - PullRequest
0 голосов
/ 13 марта 2019

Я хочу определить udf. В теле функции он будет искать данные из внешнего кадра данных. Как я могу это сделать? Я пытался передать датафрейм в формате udf. Но это не может работать.

Пример кода:

val countryDF = spark.read
  .option("inferSchema", "true")
  .option("header", "true")
  .csv("Country.csv")

val geo = (originString: String, dataFrame: DataFrame) => {
  // Search data from countryDF
  val row = dataFrame.where(col("CountryName") === originString)
  if (row != Nil){
    // set data to row index 2
    row.getAs[String](2)
  }
  else{
    "0"
  }
}
val udfGeo = udf(geo)

val cLatitudeAndLongitude = udfGeo(countryTestDF.col("CountryName"), lit(countryDF))

countryTestDF = countryTestDF.withColumn("Latitude", cLatitudeAndLongitude)

1 Ответ

0 голосов
/ 13 марта 2019

Если вы хотите использовать UDF, вам нужно работать со столбцами, а не с объектом Dataframe. Вы должны создать новый столбец, который будет принимать выходные данные UDF.

def geo(originString : String, CountryName: String) : Int = {

    if (CountryName == originString){
      return 1}
    else{
      return 0}
  }

val geoUDF = udf(geo _)

val newData = countryDF.withColum("isOrignOrNot", geoUDF(col("originString"),col("CountryName"))
...