У меня есть следующий фрейм данных: скажем, DF1 как
root
|-- VARIANTS: string (nullable = true)
|-- VARIANT_ID: long (nullable = false)
|-- CASE_ID: string (nullable = true)
|-- APP_ID: integer (nullable = false)
Где варианты (строка) выглядят так:
Activity_1, Activity_2, Activity_2, Activity_3, Activity_5...
Я пытаюсь получить новый столбец, например
Variants_stats как (на строку):
Activity_1: 1, Activity_2: 2, Activity_3: 1, Activity_5: 1
Подход, который я до сих пор использовал: 1) Создайте UDF:
val countActivityFrequences = udf((value: String) => value.split(",").map(_.trim).groupBy(identity).mapValues(_.length).map{case (k, v) => k + ":" + v}.mkString(","))
val dfNew = df1.withColumn("Variants_stats", countActivityFrequences($"VARIANTS"))
Кажется, все в порядке (по крайней мере, искра не 'не жалею), пока я не попытаюсь сделать какой-либо SQL или dfNew.show (ложный) вызов, который всегда возвращает мне:
java.lang.StringIndexOutOfBoundsException: String index out of range: -84
at java.lang.String.substring(String.java:1931)
at java.lang.Class.getSimpleBinaryName(Class.java:1448)
at java.lang.Class.getSimpleName(Class.java:1309)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.udfErrorMessage$lzycompute(ScalaUDF.scala:1055)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.udfErrorMessage(ScalaUDF.scala:1054)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.doGenCode(ScalaUDF.scala:1006)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
at scala.Option.getOrElse(Option.scala:121)
Я не могу понять, что здесь происходит неправильно?
Использую Spark 2.1 +
Для воспроизведения:
val items = List(
"A_001,A_002,A_010,A_0200,A_0201,A_0201,A_0202,A_0206,A_0207,A_0208,A_0208,A_0209,A_070,A_071,A_072,A_073,A_073,A_074",
"A_001,A_002,A_010,A_0201,A_0201,A_0201,A_0202,A_0206,A_0207,A_0208,A_0208,A_0209,A_070,A_071,A_072,A_073,A_073,A_073")
val df = sc.parallelize(items).toDF("VARIANTS")
df.show(false)
df.printSchema
// create UDF function
val countActivityFrequences = udf((value: String) => value.split(",").map(_.trim).groupBy(identity).mapValues(_.length).map{case (k, v) => k + ":" + v}.mkString(","))
// Apply UDF against our little DF
var dfNew = df.withColumn("Variants_stats", countActivityFrequences($"VARIANTS"))
dfNew.printSchema
// Error Thrown : (either Malforned class name, or java.lang.StringIndexOutOfBoundsException )
dfNew.show(false)
Обновление :
Проблема возникала только в нашей среде AWS EMR, под цеппелином.Перезапуск переводчика заставил его работать.