Возникла проблема при использовании SparkUDF с несколькими аргументами - PullRequest
0 голосов
/ 21 января 2019

Я пытаюсь закодировать данные, используя SHA-256, передавая их в качестве аргумента в Spark UDF, но получая ошибку ниже. Пожалуйста, найдите фрагмент программы и подробности ошибки ниже.

Фрагмент кода:

package com.sample
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import java.security.MessageDigest
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.UserDefinedFunction
import javax.xml.bind.DatatypeConverter;
import org.apache.spark.sql.Column

object Customer {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Customer-data").setMaster("local[2]").set("spark.executor.memory", "1g");

    val sc = new SparkContext(conf)

    val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
    //val hash_algm=sc.getConf.get("halgm")
    val hash_algm="SHA-256"

    val df = spark.read.format("csv").option("header", "true").load("file:///home/tcs/Documents/KiranDocs/Data_files/sample_data")
    spark.udf.register("encriptedVal1", encriptedVal)
    //calling encription UDF function
    //val resDF1 = df.withColumn(("ssn_number"), encriptedVal(df("customer_id"))).show()
    val resDF2 = df.withColumn(("ssn_number"), encriptedVal(array("customer_id", hash_algm))).show()


    println("data set"+resDF2)   


    sc.stop()

  }
   def encriptedVal = udf((s: String,s1:String) => {
    val digest = MessageDigest.getInstance(s1)
    val hash = digest.digest(s.getBytes("UTF-8"))
    DatatypeConverter.printHexBinary(hash)
  })

}

Подробности ошибок ниже:

Исключение в теме "главная" 2019-01-21 19:42:48 ИНФОРМАЦИЯ SparkContext: 54 - Вызов stop () из ловушки отключения java.lang.ClassCastException: com.sample.Customer $$ anonfun $ encriptedVal $ 1 не может быть приведен к scala.Function1 at . Org.apache.spark.sql.catalyst.expressions.ScalaUDF (ScalaUDF.scala: 104) в org.apache.spark.sql.expressions.UserDefinedFunction.apply (UserDefinedFunction.scala: 85) в com.sample.Customer $ .main (Customer.scala: 26) в com.sample.Customer.main (Customer.scala)

1 Ответ

0 голосов
/ 21 января 2019

Проблема здесь в том, как вы называете определенный UDF.Вы должны использовать его следующим образом:

val resDF1 = df.withColumn(("ssn_number"), encriptedVal(df.col("customer_id"), lit(hash_algm)))

, потому что он принимает два объекта Column s (оба Column s должны иметь тип String, как определено в вашей пользовательской функции).

...