Scala возвращает несколько столбцов в UDF после разрыва строки - PullRequest
0 голосов
/ 24 апреля 2018

Я пытаюсь разорвать строку (технически строки, переданные из столбца в кадре данных) и вернуть эти разбитые строки в виде списка в массив данных. Scala версия 2.11. Я бы предпочел решения для scala или pyspark с помощью udf - потому что внутри udf происходит много всего.

Допустим, у меня есть датафрейм:

val df = List(("123", "a*b*c*d*e*f*x*y*z"), ("124", "g*h*i*j*k*l*m*n*o")).toDF("A", "B")

Результат, который я хочу (в формате udf, потому что там много чего происходит; Scala версия 2.11) -

 A       B
123    ((a, b, c),
        (d, e, f),
        (x, y, z))
124    ((g, h, i),
        (j, k, l), 
        (m, n, o))

Напишите udf, чтобы разбить это и вернуть списки - но я не знаю, как определить или передать схему, чтобы результаты возвращались в фрейм данных в виде трех столбцов.

def testUdf =  udf( (s: String) => { 
  val a = s.split("\\*").take(3).toList
  val b = s.split("\\*").drop(3).take(3).toList
  val c = s.split("\\*").drop(6).take(3).toList
  val abc = (a, b, c).zipped.toList.asInstanceOf[List[String]]
  // println (abc) // This does not work
} )
val df2 = df.select($"A", testUdf($"B").as("B")) // does not work because of type mismatch. 

Я пытался сделать это, но я не знаю, как передать схему в Udf выше:

   val schema = StructType(List(
     StructField("C1", StringType),
     StructField("C2", StringType),
     StructField("C3", StringType)
   ))

Кроме того, после этого я надеюсь выполнить процедуру, описанную в Разбить несколько столбцов в таблице Spark SQL , чтобы взорвать фрейм данных.

Помощь будет принята с благодарностью.

Ответы [ 3 ]

0 голосов
/ 24 апреля 2018

Проблема в том, что ваш UDF возвращает Unit (последний оператор - возвращаемое значение).Я бы предложил следующую процедуру:

val df = List(("123", "a*b*c*d*e*f*x*y*z"), ("124", "g*h*i*j*k*l*m*n*o")).toDF("A", "B")

def testUdf = udf((s: String) => {
  val Array(s1, s2, s3, s4, s5, s6, s7, s8, s9) = s.split(s"\\*")
  Seq(
    (s1, s2, s3),
    (s4, s5, s6),
    (s7, s8, s9)
  )
})

val df2 = df.select($"A", explode(testUdf($"B")).as("B"))

df2.show()

+---+-------+
|  A|      B|
+---+-------+
|123|[a,b,c]|
|123|[d,e,f]|
|123|[x,y,z]|
|124|[g,h,i]|
|124|[j,k,l]|
|124|[m,n,o]|
+---+-------+
0 голосов
/ 25 апреля 2018

Способ, которым вы сгенерировали массивы до zipped, не будет правильно отображать элементы.Один из способов генерирования элементов в требуемом порядке - использовать двумерный массив для предварительной транспонирования элементов перед применением zipped.

. Следующая функция UDF будет 1) разбивать строковый столбец на массив, которыйтранспонируется в двумерный массив, 2) упаковывает строки двумерного массива в массив кортежей и 3) преобразует массив кортежей в кортеж кортежей (т. е. структура структур типа столбца):

val df = Seq(
  ("123", "a*b*c*d*e*f*x*y*z"),
  ("124", "g*h*i*j*k*l*m*n*o")
).toDF("A", "B")

import org.apache.spark.sql.functions._

def splitUdf = udf( (s: String) => {
  val arr = s.split("\\*")
  val arr2d = Array.ofDim[String](3, 3)

  for {
    r <- 0 until 3
    c <- 0 until 3
  } arr2d(r)(c) = arr(c * 3 + r)

  val arrTup = (arr2d(0), arr2d(1), arr2d(2)).zipped.toArray

  (arrTup(0), arrTup(1), arrTup(2))
} )

val df2 = df.select($"A", splitUdf($"B").as("B"))

df2.show(false)
// +---+-------------------------+
// |A  |B                        |
// +---+-------------------------+
// |123|[[a,b,c],[d,e,f],[x,y,z]]|
// |124|[[g,h,i],[j,k,l],[m,n,o]]|
// +---+-------------------------+
0 голосов
/ 24 апреля 2018

Заданный вами udf - от String до Unit - удалите abc из последней строки, чтобы вернуть его. Также обратите внимание, что asInstanceOf [] не меняет тип - у вас все еще есть кортеж. Ниже приведен списоксписков

def testUdf =  udf( (s: String) => { 
  val a = s.split("\\*").take(3).toList
  val b = s.split("\\*").drop(3).take(3).toList
  val c = s.split("\\*").drop(6).take(3).toList
  (a, b, c).zipped.toList.map(t=>List(t._1,t._2,t._3))
} )
...