scala spark взрывает фрейм данных медленно - так, альтернативный метод - создает столбцы и строки из массивов в столбце - PullRequest
0 голосов
/ 16 мая 2018

Scala 2.11.8, spark 2.0.1

Функция разнесения очень медленная - поэтому ищем альтернативный метод.Я думаю, что это возможно с RDD с flatmap - и помощь очень ценится.

У меня есть udf, который возвращает List (String, String, String, Int) различной длины.Для каждой строки в фрейме данных я хочу создать несколько строк и создать несколько столбцов.

def Udf = udf ( (s: String ) => {
   if (s=="1") Seq(("a", "b", "c", 0), ("a1", "b1", "c1", 1), ("a2", "b2", "c2", 2)).toList   
       else Seq(("a", "b", "c", 0)).toList
})

val df = Seq(("a", "1"), ("b", "2")).toDF("A", "B")
val df1 = df.withColumn("C", Udf($"B"))
val df2 = df1.select($"A", explode($"C"))
val df3 = df2.withColumn("D", $"col._1").withColumn("E", $"col._2").withColumn("F", $"col._3").withColumn("G", $"col._4")

/// dataframe after going through udf
+---+---+--------------------+
|  A|  B|                   C|
+---+---+--------------------+
|  a|  1|[[a,b,c,0], [a1,b...|
|  b|  2|         [[a,b,c,0]]|
+---+---+--------------------+

///Final dataframe
+---+------------+---+---+---+---+
|  A|         col|  D|  E|  F|  G|
+---+------------+---+---+---+---+
|  a|   [a,b,c,0]|  a|  b|  c|  0|
|  a|[a1,b1,c1,1]| a1| b1| c1|  1|
|  a|[a2,b2,c2,2]| a2| b2| c2|  2|
|  b|   [a,b,c,0]|  a|  b|  c|  0|
+---+------------+---+---+---+---+

Это очень медленно на многих миллионах строк.Занимает более 12 часов.

Ответы [ 2 ]

0 голосов
/ 16 мая 2018

Вот еще один простой пример:

val ds = sc.parallelize(Seq((0, "Lorem ipsum dolor", 1.0, Array("prp1", "prp2", "prp3"))))

Альтернативный способ взрыва массивов с помощью flatMaps.

ds.flatMap { t => 
  t._4.map { prp => 
    (t._1, t._2, t._3, prp) }}.collect.foreach(println) 

Результат:

(0,Lorem ipsum dolor,1.0,prp1)
(0,Lorem ipsum dolor,1.0,prp2)
(0,Lorem ipsum dolor,1.0,prp3)

Пробовал с вашим набором данных, но не уверен, что это оптимальный способ сделать это.

df1.show(false)

+---+---+------------------------------------------------+
|A  |B  |C                                               |
+---+---+------------------------------------------------+
|a  |1  |[[a, b, c, 0], [a1, b1, c1, 1], [a2, b2, c2, 2]]|
|b  |2  |[[a, b, c, 0]]                                  |
+---+---+------------------------------------------------+


df1.rdd.flatMap { t:Row => t.getSeq(2).map { row: Row => (t.getString(0),t.getString(1),row)}}
.map {
    case (col1: String,col2: String, col3: Row) => (col1, col2,col3.getString(0),col3.getString(1),col3.getString(2),col3.getInt(3))
  }.collect.foreach(println)

Результат:

(a,1,a,b,c,0)
(a,1,a1,b1,c1,1)
(a,1,a2,b2,c2,2)
(b,2,a,b,c,0)  

Надеюсь, это поможет!

0 голосов
/ 16 мая 2018

Вы можете:

  • Обновить Spark до версии 2.3 или новее, где SPARK-21657 должно быть исправлено.
  • Заменить код на flatMap:

    df.as[(String, String)].flatMap { 
      case (a, "1") => Seq(
        (a, "a", "b", "c", 0), (a, "a1", "b1", "c1", 1), 
        (a, "a2", "b2", "c2", 2)
      )
      case (a, _) => Seq((a, "a", "b", "c", 0))
    }
    
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...