Перемешивание элементов СДР [List [Double]] в Spark - PullRequest
0 голосов
/ 05 июня 2018

В программе, которую я разрабатываю с использованием Spark 2.3 в Scala, у меня есть RDD[List[Double]].Каждый List[Double] имеет одинаковый размер.Я не могу понять, как выполнить преобразование, которое дает RDD

[1.0, 1.5, 4.0, 3.0],
[2.3, 5.6, 3.4, 9.0],
[4.5, 2.0, 1.0, 5.7]

преобразовать его в RDD

[2.3, 2.0, 1.0, 3.0],
[1.0, 5.6, 4.0, 5.7],
[4.5, 1.5, 3.4, 9.0]

, где каждый элемент спискапоменялись местами, поддерживая ту же позицию.

Например, первый элемент первого списка перемещается на первую позицию второго списка, второй элемент первого списка перемещается на вторую позицию третьего списка и т. Д.

Большое спасибо.

1 Ответ

0 голосов
/ 06 июня 2018

Один из подходов к перетасовке по столбцам мог бы состоять в том, чтобы разбить набор данных на отдельные одиночные столбцы DataFrames, каждый из которых перетасовывается с помощью orderBy(rand), а затем соединить их вместе.

Чтобы объединить перетасованныеDataFrames, RDD zipWithIndex применяется к каждому из них для создания идентификаторов строк.Обратите внимание, что monotonically_increasing_id не будет сокращать его, поскольку не гарантирует генерацию того же списка идентификаторов, необходимых для окончательного join.Следовательно, это довольно дорого из-за необходимого преобразования между RDD и DataFrame.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val rdd0 = sc.parallelize(Seq(
    List(1.0, 1.5, 4.0, 3.0),
    List(2.3, 5.6, 3.4, 9.0),
    List(4.5, 2.0, 1.0, 5.7)
  ))
//rdd0: org.apache.spark.rdd.RDD[List[Double]] = ...

val rdd = rdd0.map{ case x: Seq[Double] => (x(0), x(1), x(2), x(3)) }
val df = rdd.toDF("c1", "c2", "c3", "c4")

val shuffledDFs = df.columns.filter(_.startsWith("c")).map{ c =>
  val subDF = df.select(c)
  val subRDD = subDF.orderBy(rand).rdd.zipWithIndex.map{
    case (row: Row, id: Long) => Row.fromSeq(row.toSeq :+ id)
  }
  spark.createDataFrame( subRDD,
    StructType(subDF.schema.fields :+ StructField("idx", LongType, false))
  )
}

shuffledDFs.reduce( _.join(_, Seq("idx")) ).
  show
// +---+---+---+---+---+                                                           
// |idx| c1| c2| c3| c4|
// +---+---+---+---+---+
// |  0|2.3|2.0|4.0|9.0|
// |  1|1.0|5.6|3.4|3.0|
// |  2|4.5|1.5|1.0|5.7|
// +---+---+---+---+---+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...