Как сдвинуть элементы в СДР? - PullRequest
       11

Как сдвинуть элементы в СДР?

1 голос
/ 30 сентября 2019

Я новичок в Scala и пытаюсь выяснить, как сдвигать элементы в СДР.
Я читаю пары из файла CSV:

var listOfPairs = Spark.sc.textFile( <filePath> )
                  .map(aLine => aLine.split(","))
                  .map(aPair=> (aPair(0), aPair(1)))

Содержимое файла выглядит следующим образом:

a,1
b,2
c,3
d,4
e,5

В каждом цикле я хочу сдвинуть элементы один раз.

for (i <- 1 to numberOfLoops) { ...?... }

Каждый шаг будет выглядеть следующим образом, для numberOfLoops = 3:

[(a,1),(b,2),(c,3),(d,4),(e,5)]  
1: [ (b,2), (c,3), (d,4), (e,5), (a,1) ]  
2: [ (c,3), (d,4), (e,5), (a,1), (b,2) ]  
3: [ (d,4), (e,5), (a,1), (b,2), (c,3) ]  

1 Ответ

1 голос
/ 30 сентября 2019

Вот основная идея, как выполнить смену. Это может быть улучшено для производительности (особенно есть способ избежать нескольких итераций для многих последовательных смен) , но это оставлено в качестве упражнения для читателя.

Основа алгоритмадать каждому элементу уникальный ключ, затем создать копию данных со смещенным ключом и соединить их ключом.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.master("local[1]").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._

val data = sc.parallelize(List("a,1", "b,2", "c,3", "d,4", "e,5"))
val listOfPairs = data.map(_.split(",")).map { case Array(a, b) => a -> b }

val indexed = listOfPairs.zipWithIndex.map { case (tuple, idx) => idx -> tuple }
val lastIndex = indexed.count() - 1

val newIndexed = indexed.map {
  case (idx, (a, b)) =>
    if (idx == lastIndex)
      (0L, (a, b))
    else
      (idx + 1, (a, b))
}

val shifted = newIndexed.join(indexed).map {
  case (_, ((a, _), (_, b))) => a -> b
}
...