Объединение UDF в scala Spark - PullRequest
1 голос
/ 17 июня 2020

Я написал следующий код, который работает нормально. Но я хочу объединить UDF, чтобы этот код можно было сжать в пару строк. Пожалуйста, подскажите, как я могу это сделать. Ниже приведен код, который я написал.

val myUdf1 = udf((Number: Long) => ((Number) >> 24) & 255)
val myUdf2 = udf((Number: Long) => ((Number) >> 16) & 255)
val myUdf3 = udf((Number: Long) => ((Number) >> 8) & 255)
val myUdf4 = udf((Number: Long) => (Number) & 255)

val df=Data.withColumn("bitwise 1", myUdf1(Data("Ip")))
  .withColumn("bitwise 2", myUdf2(Data("Ip")))
  .withColumn("bitwise 3", myUdf3(Data("Ip")))
  .withColumn("bitwise 4", myUdf4(Data("Ip")))

val FinalDF =  df.withColumn("FinalIp",concat(col("bitwise 1"),lit("."),col("bitwise 2"),lit("."),col("bitwise 3"),lit("."),col("bitwise 4")))
.drop("bitwise 1").drop("bitwise 2").drop("bitwise 3").drop("bitwise 4")

Ответы [ 2 ]

1 голос
/ 18 июня 2020

Как предложил @Someshwar Kale, вы можете обойтись без UDF.

Если вы решите использовать UDF, вы можете абстрагироваться от функций в своих UDF и объединять их в одну функцию

scala> Data.show
+---+
| ip|
+---+
| 10|
| 20|
+---+


scala> val a:Seq[(Int, Int)] = Seq((24, 255), (16,255), (8, 255),(0,255))
a: Seq[(Int, Int)] = List((24,255), (16,255), (8,255), (0,255))

scala> val myUdf = udf((number: Long) => (a.map((t:(Int, Int)) => (number >> t._1) & t._2).mkString(".")))
myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(LongType)))

scala> Data.withColumn("finalIp", myUdf($"ip")).show
+---+--------+
| ip| finalIp|
+---+--------+
| 10|0.0.0.10|
| 20|0.0.0.20|
+---+--------+
1 голос
/ 17 июня 2020

Думаю, этого можно достичь без udf -

WIth UDF

val Data = spark.range(2).withColumn("Ip", lit(10))
    val myUdf1 = udf((Number: Long) => ((Number) >> 24) & 255)
    val myUdf2 = udf((Number: Long) => ((Number) >> 16) & 255)
    val myUdf3 = udf((Number: Long) => ((Number) >> 8) & 255)
    val myUdf4 = udf((Number: Long) => (Number) & 255)

    val df=Data.withColumn("bitwise 1", myUdf1(Data("Ip")))
      .withColumn("bitwise 2", myUdf2(Data("Ip")))
      .withColumn("bitwise 3", myUdf3(Data("Ip")))
      .withColumn("bitwise 4", myUdf4(Data("Ip")))

    val FinalDF =  df.withColumn("FinalIp",concat(col("bitwise 1"),lit("."),col("bitwise 2"),lit("."),col("bitwise 3"),lit("."),col("bitwise 4")))
      .drop("bitwise 1").drop("bitwise 2").drop("bitwise 3").drop("bitwise 4")
    FinalDF.show(false)

    /**
      * +---+---+--------+
      * |id |Ip |FinalIp |
      * +---+---+--------+
      * |0  |10 |0.0.0.10|
      * |1  |10 |0.0.0.10|
      * +---+---+--------+
      */

Без UDF

 spark.range(2).withColumn("Ip", lit(10))
      .withColumn("FinalIp",
        concat_ws(".", expr("shiftRight(Ip, 24) & 255"), expr("shiftRight(Ip, 16) & 255"),
          expr("shiftRight(Ip, 8) & 255"), expr("Ip & 255"))
      ).show(false)

    /**
      * +---+---+--------+
      * |id |Ip |FinalIp |
      * +---+---+--------+
      * |0  |10 |0.0.0.10|
      * |1  |10 |0.0.0.10|
      * +---+---+--------+
      */
...