Как исправить: java.lang.StringIndexOutOfBoundsException из искровой функции UDF - PullRequest
2 голосов
/ 29 апреля 2019

У меня есть следующий фрейм данных: скажем, DF1 как

root
 |-- VARIANTS: string (nullable = true)
 |-- VARIANT_ID: long (nullable = false)
 |-- CASE_ID: string (nullable = true)
 |-- APP_ID: integer (nullable = false)

Где варианты (строка) выглядят так:

Activity_1, Activity_2, Activity_2, Activity_3, Activity_5...

Я пытаюсь получить новый столбец, например

Variants_stats как (на строку):

Activity_1: 1, Activity_2: 2, Activity_3: 1, Activity_5: 1

Подход, который я до сих пор использовал: 1) Создайте UDF:

val countActivityFrequences = udf((value: String) => value.split(",").map(_.trim).groupBy(identity).mapValues(_.length).map{case (k, v) => k + ":" + v}.mkString(","))
val dfNew = df1.withColumn("Variants_stats", countActivityFrequences($"VARIANTS"))

Кажется, все в порядке (по крайней мере, искра не 'не жалею), пока я не попытаюсь сделать какой-либо SQL или dfNew.show (ложный) вызов, который всегда возвращает мне:

java.lang.StringIndexOutOfBoundsException: String index out of range: -84
    at java.lang.String.substring(String.java:1931)
    at java.lang.Class.getSimpleBinaryName(Class.java:1448)
    at java.lang.Class.getSimpleName(Class.java:1309)
    at org.apache.spark.sql.catalyst.expressions.ScalaUDF.udfErrorMessage$lzycompute(ScalaUDF.scala:1055)
    at org.apache.spark.sql.catalyst.expressions.ScalaUDF.udfErrorMessage(ScalaUDF.scala:1054)
    at org.apache.spark.sql.catalyst.expressions.ScalaUDF.doGenCode(ScalaUDF.scala:1006)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
    at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
    at scala.Option.getOrElse(Option.scala:121)

Я не могу понять, что здесь происходит неправильно?

Использую Spark 2.1 +

Для воспроизведения:

val items = List(
    "A_001,A_002,A_010,A_0200,A_0201,A_0201,A_0202,A_0206,A_0207,A_0208,A_0208,A_0209,A_070,A_071,A_072,A_073,A_073,A_074",
    "A_001,A_002,A_010,A_0201,A_0201,A_0201,A_0202,A_0206,A_0207,A_0208,A_0208,A_0209,A_070,A_071,A_072,A_073,A_073,A_073")
val df = sc.parallelize(items).toDF("VARIANTS")
df.show(false)
df.printSchema

// create UDF function
val countActivityFrequences = udf((value: String) => value.split(",").map(_.trim).groupBy(identity).mapValues(_.length).map{case (k, v) => k + ":" + v}.mkString(","))
// Apply UDF against our little DF
var dfNew = df.withColumn("Variants_stats", countActivityFrequences($"VARIANTS"))
dfNew.printSchema
// Error Thrown : (either Malforned class name, or java.lang.StringIndexOutOfBoundsException )
dfNew.show(false) 

Обновление :

Проблема возникала только в нашей среде AWS EMR, под цеппелином.Перезапуск переводчика заставил его работать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...