Отображение простых значений из карты в искровую ошибку DataFrame - PullRequest
0 голосов
/ 10 апреля 2020

Я недавно начал использовать Spark в Scala, и я попал в ситуацию, когда я хочу отобразить некоторые значения из hashmap / map в фрейм данных без необходимости создавать новый фрейм данных, а затем выполнять какие-то объединения.

У меня есть этот фрейм данных:

+---+-------+---+----------+---------+
| id|   name|age|      date|genderKey|
+---+-------+---+----------+---------+
|  1|Rodrigo| 30|2019-01-01|     male|
|  2|Roberto| 23|2019-01-01|     male|
|  3|Roberto| 25|2019-01-01|     male|
|  4|Rodrigo| 30|2019-01-01|     male|
|  5|Mariana| 32|2019-01-01|   female|
+---+-------+---+----------+---------+

И эта структура карты:

var genderMap = Map[String, String](
    "male" -> "Masculine",
    "female" -> "Feminine"
)

Я хочу добавить столбец в фрейм данных с сопоставленными значениями через пользовательскую функцию:

val getGenderName = udf((gender:String)=>genderMap(person))

dfPeople
    .withColumn("genderName", getGenderName(col("genderKey")))
    .show()

Когда я выполняю операцию показа, я получаю эту ошибку:

sqlfunc: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(IntegerType)))
getGender: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
java.lang.InternalError: Malformed class name
  at java.lang.Class.getSimpleBinaryName(Class.java:1450)
  at java.lang.Class.getSimpleName(Class.java:1309)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.udfErrorMessage$lzycompute(ScalaUDF.scala:1048)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.udfErrorMessage(ScalaUDF.scala:1047)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.doGenCode(ScalaUDF.scala:1000)
  at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
  at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
  at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:142)
  at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
  at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:285)
  at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:60)
  at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
  at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:354)
  at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:383)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:354)
  at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
  at org.apache.spark.sql.execution.BaseLimitExec$class.doProduce(limit.scala:70)
  at org.apache.spark.sql.execution.LocalLimitExec.doProduce(limit.scala:97)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
  at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
  at org.apache.spark.sql.execution.LocalLimitExec.produce(limit.scala:97)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3273)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
  ... 118 elided
Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: -10
  at java.lang.String.substring(String.java:1931)
  at java.lang.Class.getSimpleBinaryName(Class.java:1448)
  ... 185 more

Любые идеи?

Спасибо!

1 Ответ

0 голосов
/ 10 апреля 2020

С Spark2.2+:

Вам не нужно использовать udf здесь, вместо этого используйте встроенную функцию typedLit для создания поиска .

Example:

import org.apache.spark.sql.functions._

val df=Seq(("1","Rodrigo","30","2019-01-01","male"),("1","Rodrigo","30","2019-01-01","female"),("1","Rodrigo","30","2019-01-01","mal")).toDF("id","name","age","date","genderKey")

val genderMap=typedLit(Map("male" -> "Masculine","female" -> "Feminie"))
//genderMap: org.apache.spark.sql.Column = keys: [male,female], values: [Masculine,Feminie]

df.withColumn("genderName",coalesce(genderMap(col("genderKey")),lit("not found"))).show()
//+---+-------+---+----------+---------+----------+
//| id|   name|age|      date|genderKey|genderName|
//+---+-------+---+----------+---------+----------+
//|  1|Rodrigo| 30|2019-01-01|     male| Masculine|
//|  1|Rodrigo| 30|2019-01-01|   female|   Feminie|
//|  1|Rodrigo| 30|2019-01-01|      mal| not found|
//+---+-------+---+----------+---------+----------+

Использование UDF:

var genderMap = Map[String, String](
    "male" -> "Masculine",
    "female" -> "Feminine"
)

def getGenderName(genderMap:Map[String,String]) = udf((gender:String) => genderMap.getOrElse(gender,"not found"))

df.withColumn("genderName",getGenderName(genderMap)(col("genderKey"))).show()
//+---+-------+---+----------+---------+----------+
//| id|   name|age|      date|genderKey|genderName|
//+---+-------+---+----------+---------+----------+
//|  1|Rodrigo| 30|2019-01-01|     male| Masculine|
//|  1|Rodrigo| 30|2019-01-01|   female|  Feminine|
//|  1|Rodrigo| 30|2019-01-01|      mal| not found|
//+---+-------+---+----------+---------+----------+
...