Spark UDF для разделения значения столбца на несколько столбцов - PullRequest
0 голосов
/ 06 октября 2018

У меня есть столбец данных, называемый значением 'description', в следующем формате

ABC XXXXXXXXXXXX STORE NAME ABC TYPE1

Я хочу разобрать его на 3 разных столбца, как показано ниже

|  mode |  type  |  store       |  description                           |
|------------------------------------------------------------------------|
|  ABC  |  TYPE1 |  STORE NAME  | ABC XXXXXXXXXXXX STORE NAME ABC TYPE1  |

Я попробовал метод, предложенный как здесь .Это работает для простой функции UDF, но не для функции, которую я написал.Сложность заключается в том, что значение store может быть больше 2 слов или без фиксированного количества слов.

def myFunc1: (String => (String, String, String)) = { description =>
      var descripe = description.split(" ")
      val type = descripe(descripe.size - 1)
      descripe = description.substring(description.indexOf("ABC") + 4, description.lastIndexOf("ABC")).split(" ")
      val mode = descripe(0)
      descripe(0) = ""
      val store = descripe.mkString(" ").trim
      (mode, store, type)
    }

val schema = StructType(Array(
  StructField("mode", StringType, true),
  StructField("store", StringType, true),
  StructField("type", StringType, true)
))

val myUDF = udf(myFunc1, schema)

val test = pos.withColumn("test", myUDF(col("description")))
    test.printSchema()
val a =test.withColumn("mode", col("test").getItem("_1"))
    .withColumn("store", col("test").getItem("_2"))
    .withColumn("type", col("test").getItem("_3"))
    .drop(col("test"))

a.printSchema()
a.show(5, false)

Я получаю приведенную ниже ошибку при выполнении

18/10 /06 21:38:02 ОШИБКА Исполнитель: Исключение в задаче 0.0 на этапе 5.0 (TID 5) org.apache.spark.SparkException: Не удалось выполнить пользовательскую функцию ($ anonfun $ myFunc1 $ 1 $ 1: (строка) => struct (mode): string, store: string, type: string)) в org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext (неизвестный источник) в org.apache.spark.sql.execution.BufferedRowIterator.hasNext (BufferedRowIterator.java: 43) at org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $$ anon $ 1.hasNext (WholeStageCodegenExec.scala: 395) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $.apply (SparkPlan.scala: 234) в org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 2.apply (SparkPlan.scala: 228) в org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1$$ anonfun $ apply $ 25.apply (RDD.scala: 827) в org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ применить $ 25.apply (RDD.scala: 827) в org.apache.spark.rdd.MapPartitionsRDD.compute (MapPartitionsRDD.scala: 38) в org.apache.spark.rdd.RDD.computeOrReadCheckpoint (RDD.scala: 323) в org.apache.spark.rdd.RDD.iterator (RDD.scala: 287) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 87) или.apache.spark.scheduler.Task.run (Task.scala: 108) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 338) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java: 1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) в java.lang.Thread.run (Thread.java:748) Причина: java.lang.StringIndexOutOfBoundsException: строковый индексвне диапазона: -4 в java.lang.String.substring (String.java:1967) в com.hasif.bank.track.trasaction.TransactionParser $$ anonfun $ myFunc1 $ 1 $ 1.apply (TransactionParser.scala: 26) вcom.hasif.bank.track.trasaction.TransactionParser $$ anonfun $ myFunc1 $ 1 $ 1.Apply (TransactionParser.scala: 22) ... еще 16

Любые указатели на это будут оценены.

1 Ответ

0 голосов
/ 06 октября 2018

Проверьте это.

scala> val df = Seq("ABC XXXXXXXXXXXX STORE NAME ABC TYPE1").toDF("desc")
df: org.apache.spark.sql.DataFrame = [desc: string]

scala> df.withColumn("mode",split('desc," ")(0)).withColumn("type",split('desc," ")(5)).withColumn("store",concat(split('desc," ")(2), lit(" "), split('desc," ")(3))).show(false)
+-------------------------------------+----+-----+----------+
|desc                                 |mode|type |store     |
+-------------------------------------+----+-----+----------+
|ABC XXXXXXXXXXXX STORE NAME ABC TYPE1|ABC |TYPE1|STORE NAME|
+-------------------------------------+----+-----+----------+


scala>

Обновление1:

scala> def splitStore(x:String):String=
     | return x.split(" ").drop(2).init.init.mkString(" ")
splitStore: (x: String)String

scala> val mysplitstore = udf(splitStore(_:String):String)
mysplitstore: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> val df2 = Seq("ABC XXXXXXXXXXXX STORE NAME XYZ ABC TYPE1").toDF("desc")
df2: org.apache.spark.sql.DataFrame = [desc: string]

scala> val df3 = df2.withColumn("length",split('desc," "))
df3: org.apache.spark.sql.DataFrame = [desc: string, length: array<string>]

scala> val df4 = df3.withColumn("mode",split('desc," ")(size('length)-2)).withColumn("type",split('desc," ")(size('length)-1)).withColumn("store",mysplitstore('desc))
df4: org.apache.spark.sql.DataFrame = [desc: string, length: array<string> ... 3 more fields]

scala> df4.drop('length).show(false)
+-----------------------------------------+----+-----+--------------+
|desc                                     |mode|type |store         |
+-----------------------------------------+----+-----+--------------+
|ABC XXXXXXXXXXXX STORE NAME XYZ ABC TYPE1|ABC |TYPE1|STORE NAME XYZ|
+-----------------------------------------+----+-----+--------------+


scala>
...