Как эффективно сгруппировать каждые k строк в набор искровых данных? - PullRequest
0 голосов
/ 05 ноября 2018

Я создал набор данных искры [Row], а Row is Row (x: Vector). х здесь 1xp вектор.

Возможно ли: 1) сгруппировать каждые k строк 2) объединить эти строки в матрицу k x p - т. Е. Заменить Dateset [Row (Vector)] на Dateset [Row (Matrix)]?

Вот мое текущее решение, преобразовать этот набор данных [Row] в RDD и объединить каждые k строк с помощью zipWithIndex и aggregateByKey.

val dataRDD = data_df.rdd.zipWithIndex
    .map {  case (line, index) =>  (index/k, line) }
    .aggregateByKey(...) (..., ...)

Но, похоже, это не очень эффективно, есть ли более эффективный способ сделать это?

Заранее спасибо.

Ответы [ 2 ]

0 голосов
/ 05 ноября 2018

Вот решение, которое группирует N записей в столбцы:

Генерация от RDD до DF и обработка, как показано ниже.

g является группой, k является ключом к номеру записи, который повторяется в пределах g. v ваш контент записи.

Ввод - это файл из 6 строк, и я использовал группы из 3 здесь.

Единственный недостаток - если у строк остаток меньше, чем у группы N.

import org.apache.spark.sql.functions._
import org.apache.spark.mllib.rdd.RDDFunctions._

val dfsFilename = "/FileStore/tables/7dxa9btd1477497663691/Text_File_01-880f5.txt"
val readFileRDD = spark.sparkContext.textFile(dfsFilename)
val rdd2 = readFileRDD.sliding(3,3).zipWithIndex
val rdd3 = rdd2.map(r => (r._1.zipWithIndex, r._2))
val df = rdd3.toDF("vk","g")

val df2 = df.withColumn("vke", explode($"vk")).drop("vk")
val df3 = df2.withColumn("k", $"vke._2").withColumn("v", $"vke._1").drop("vke")

val result = df3
            .groupBy("g")
            .pivot("k")
            .agg(expr("first(v)"))

result.show()

возвращается:

+---+--------------------+--------------------+--------------------+
|  g|                   0|                   1|                   2|
+---+--------------------+--------------------+--------------------+
|  0|The quick brown f...|Here he lays I te...|Gone are the days...|
|  1|  Gosh, what to say.|Hallo, hallo, how...|          I am fine.|
+---+--------------------+--------------------+--------------------+
0 голосов
/ 05 ноября 2018

В вашем подходе есть две проблемы с производительностью:

  1. Использование глобального заказа
  2. Выполнение шаффла для построения групп k

Если вам абсолютно необходим глобальный порядок, начиная со строки 1, и вы не можете разбить данные на несколько разделов, тогда Spark должен переместить все данные через одно ядро. Вы можете ускорить эту часть, найдя способ иметь более одного раздела.

Вы можете избежать случайного перемешивания, обрабатывая данные по одному разделу за раз, используя mapPartitions:

spark.range(1, 20).coalesce(1).mapPartitions(_.grouped(5)).show

+--------------------+
|               value|
+--------------------+
|     [1, 2, 3, 4, 5]|
|    [6, 7, 8, 9, 10]|
|[11, 12, 13, 14, 15]|
|    [16, 17, 18, 19]|
+--------------------+

Обратите внимание, что coalesce(1) выше вынуждает все 20 строк в один раздел.

...