Spark SQL - Scala Программа: первый столбец должен содержать наименьшую строку среди (col1, col2) каждой строки - PullRequest
0 голосов
/ 23 февраля 2020

Нужен вывод, где первый столбец должен содержать наименьшую строку из (column1, column2) каждой строки.

Я получаю ошибку: Исключение в потоке "main" org. apache .spark.SparkException: Задача не сериализуема

Я пытаюсь использовать UDF для возврата минимального и максимального значения двух строк и использовать его в операторе sql.

"выбрать minUDF (имя1, имя2), maxUDF (имя1, имя2) из ​​друзей"

Не знаю, где я делаю неправильно. Может кто-нибудь помочь мне найти ошибку?

Input :           Required Output :
+-----+-----+     +-----+-----+
|name1|name2|     |name1|name2|
+-----+-----+     +-----+-----+
| shir| amit|     | amit| shir|
| bane| shir|     | bane| shir|
| shir| raj |     |  raj| shir|
| amit| shir|     | amit| shir|
| xiag| alan|     | alan| xiag|
| shir|  raj|     |  raj| shir|
+-----+-----+     +-----+-----+

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object test {

  def main(args: Array[String]): Unit = {


    val spark=SparkSession.builder().appName("test").master("local[*]").getOrCreate();
    val sc=spark.sparkContext

    case class friends(name1:String,name2:String)
    val rdd=sc.parallelize(Seq(Row("shir","amit"),Row("amit","shir"),Row("raj","shir"),Row("amit","shir"),Row("raj","shir"),Row("shir","raj")))

    val schema=StructType(Array(
      StructField("name1",StringType,true),
      StructField("name2",StringType,true)
    ))

    val df=spark.createDataFrame(rdd, schema)
    df.show()

    val minfun = (str1:String,str2:String)=>{

      if(str1.compareTo(str2)<0)
        return str1
      else
        return str2

    }

    val maxfun = (str1:String,str2:String)=>{

      if(str1.compareTo(str2)>0)
        return str1
      else
        return str2

    }

    spark.udf.register("minUDF",minfun)
    spark.udf.register("maxUDF",maxfun)

    df.createOrReplaceTempView("friends")
    val ddd = spark.sql("select minUDF(name1,name2), maxUDF(name1,name2) from friends")
    ddd.show

    spark.stop()

  }
}```

1 Ответ

2 голосов
/ 23 февраля 2020

Пожалуйста, не пишите UDF, если у нас есть встроенные функции для него:

Попробуйте использовать SQL наименьшую и наибольшую функцию для вычисления минимума и максимума:

import org.apache.spark.sql.functions.least

import org.apache.spark.sql.functions.greatest

val ddd = spark.sql("select least(name1,name2), greatest(name1,name2) from friends")

ddd.show

Также старайтесь избегать использования UDF, потому что они должны сериализоваться среди всех рабочих узлов, что повлечет за собой затраты на сериализацию и десериализацию.

...