Как разбить элементы списка на определенное количество столбцов в Spark Scala? - PullRequest
0 голосов
/ 30 января 2019

У меня есть список, содержащий случайное количество элементов

Список Emp

101 [a, b, c, d, e]

102 [q, w, e]

103 [z, x, w, t, e, q, s]

Мне нужно, чтобы результат был разделен на 3 столбца

Emp col1 col2 col3

101 abc

101 de

102 qwe

103 zxw

103 teq

103 s

Ответы [ 4 ]

0 голосов
/ 01 февраля 2019

Это ответ, который я должен был опубликовать, если не использовал UDF.

Здесь мы используем более новый Набор данных , который позволяет нам проще использовать функции Scala непосредственно в полях DS., как RDD. В этом суть .

Действительно, DS - лучшие из двух миров, и принято считать, что UDF не используется, как в другом ответе, но иногда производительность является проблемой.

В любом случае были получены те же результаты, поэтому показан только подход DS.Обратите внимание, что определения DS и DF меняются - они указаны в именах val.

case class X(k: Integer, v: List[String]) 

import org.apache.spark.sql.functions._

val df = Seq(  (102, Array("a", "b", "c")), (103, Array("1", "2", "3", "4", "5", "6", "7", "8")), (104, Array("r"))  ).toDF("k", "v")
val ds = df.as[X]
val df2 = ds.map(x => (x.k, x.v.sliding(3,3).toArray)).withColumnRenamed ("_1", "k" ).withColumnRenamed ("_2", "v") 
val df3 = df2.select($"k", explode($"v").as("v_3")).select($"k", $"v_3"(0).as("v_3_1"), $"v_3"(1).as("v_3_2"), $"v_3"(2).as("v_3_3") )

df3.show(false)

снова приводит к:

+---+-----+-----+-----+
|k  |v_3_1|v_3_2|v_3_3|
+---+-----+-----+-----+
|102|a    |b    |c    |
|103|1    |2    |3    |
|103|4    |5    |6    |
|103|7    |8    |null |
|104|r    |null |null |
+---+-----+-----+-----+
0 голосов
/ 31 января 2019

Проверьте это:

scala> val df = Seq((101,Array("a","b","c","d","e")),(102,Array("q","w","e")),(103,Array("z","x","w","t","e","q","s"))).toDF("emp","list")
df: org.apache.spark.sql.DataFrame = [emp: int, list: array<string>]

scala> df.show(false)
+---+---------------------+
|emp|list                 |
+---+---------------------+
|101|[a, b, c, d, e]      |
|102|[q, w, e]            |
|103|[z, x, w, t, e, q, s]|
+---+---------------------+


scala> val udf_slice = udf( (x:Seq[String]) => x.grouped(3).toList  )
udf_slice: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(ArrayType(StringType,true),true),Some(List(ArrayType(StringType,true))))

scala> df.select(col("*"), explode(udf_slice($"list")).as("newlist")).select($"emp", $"newlist"(0).as("col1"), $"newlist"(1).as("col2"), $"newlist"(2).as("col3") ).show(false)
+---+----+----+----+
|emp|col1|col2|col3|
+---+----+----+----+
|101|a   |b   |c   |
|101|d   |e   |null|
|102|q   |w   |e   |
|103|z   |x   |w   |
|103|t   |e   |q   |
|103|s   |null|null|
+---+----+----+----+


scala>

Spark 2.4 - только что попытался реализовать без udfs .. но функция slice () не принимает другие столбцы в качестве параметров для диапазона

val df = Seq((101,Array("a","b","c","d","e")),(102,Array("q","w","e")),(103,Array("z","x","w","t","e","q","s"))).toDF("emp","list")
df.show(false)
val df2 = df.withColumn("list_size_arr",  array_repeat(lit(1), ceil(size('list)/3).cast("int")) )
val df3 = df2.select(col("*"),posexplode('list_size_arr))
val udf_slice = udf( (x:Seq[String],start:Int, end:Int )  => x.slice(start,end) )
df3.withColumn("newlist",udf_slice('list,'pos*3, ('pos+1)*3  )).select($"emp", $"newlist").show(false)

Результаты:

+---+---------------------+
|emp|list                 |
+---+---------------------+
|101|[a, b, c, d, e]      |
|102|[q, w, e]            |
|103|[z, x, w, t, e, q, s]|
+---+---------------------+

+---+---------+
|emp|newlist  |
+---+---------+
|101|[a, b, c]|
|101|[d, e]   |
|102|[q, w, e]|
|103|[z, x, w]|
|103|[t, e, q]|
|103|[s]      |
+---+---------+

Получить в отдельные столбцы

val df4 = df3.withColumn("newlist",udf_slice('list,'pos*3, ('pos+1)*3  )).select($"emp", $"newlist")
df4.select($"emp", $"newlist"(0).as("col1"), $"newlist"(1).as("col2"), $"newlist"(2).as("col3") ).show(false)


+---+----+----+----+
|emp|col1|col2|col3|
+---+----+----+----+
|101|a   |b   |c   |
|101|d   |e   |null|
|102|q   |w   |e   |
|103|z   |x   |w   |
|103|t   |e   |q   |
|103|s   |null|null|
+---+----+----+----+
0 голосов
/ 31 января 2019

Другой подход, не использующий UDF, заключается в следующем: можно использовать скольжение примечаний, но оно включает преобразование в RDD и обратно:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

// No use of UDF means conversion to RDD and back again.
val data = List( (102, Array("a", "b", "c")), (103, Array("1", "2", "3", "4", "5", "6", "7", "8")), (104, Array("r"))  )
val rdd = sc.parallelize(data)
val df = rdd.toDF("k", "v")

// Make groups of 3 as requested, methods possible. 
val rddX = df.as[(Int, List[String])].rdd // This avoids Row and Any issues that typically crop up.
//val rddY = rddX.map(x => (x._1, x._2.grouped(3).toArray)) 
val rddY = rddX.map(x => (x._1, x._2.sliding(3,3).toArray)) 

// Get k,v's with v the set of 3 and make single columns.
val df2 = rddY.toDF("k", "v")
val df3 = df2.select($"k", explode($"v").as("v_3"))
val df4 = df3.select($"k", $"v_3"(0).as("v_3_1"), $"v_3"(1).as("v_3_2"), $"v_3"(2).as("v_3_3") )
df4.show(false)

возвращает:

+---+-----+-----+-----+
|k  |v_3_1|v_3_2|v_3_3|
+---+-----+-----+-----+
|102|a    |b    |c    |
|103|1    |2    |3    |
|103|4    |5    |6    |
|103|7    |8    |null |
|104|r    |null |null |
+---+-----+-----+-----+
0 голосов
/ 31 января 2019

, возможно, есть лучшее решение, но я придумал это:

import java.util.Arrays
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val employees = Array((101,Array("a","b","c","d","e")),(102,Array("q","w","e")),(103,Array("z","x","w","t","e","q","s")))
def f(emp:Int, num:Array[String]):Row={
  Row.fromSeq(s"${emp}" +: num)
}
val rowArray =for {
  x <- employees
  z <- x._2.sliding(3,3)
}yield f(x._1,Arrays.copyOf(z,3))

import spark.implicits._

val schema = StructType(
  List(StructField("emp", StringType, false),
    StructField("col1", StringType, true),
    StructField("col2", StringType, true),
    StructField("col3", StringType, true)))

val sqlContext=new SQLContext(sc)

val dfFromArray = sqlContext.createDataFrame(sc.parallelize(rowArray), schema)
dfFromArray.show

Он вернет вам что-то вроде этого:

+---+----+----+----+                                                            
|emp|col1|col2|col3|
+---+----+----+----+
|101|   a|   b|   c|
|101|   d|   e|null|
|102|   q|   w|   e|
|103|   z|   x|   w|
|103|   t|   e|   q|
|103|   s|null|null|
+---+----+----+----+
...