Конвертировать RDD из Array (Row) в RDD из Row? - PullRequest
2 голосов
/ 16 апреля 2019

У меня есть такие данные в файле, и я хотел бы сделать некоторые статистические данные, используя Spark.

Содержимое файла:

aaa|bbb|ccc
ddd|eee|fff|ggg

Мне нужно назначить каждой строке идентификатор. Я читаю их как rdd и использую zipWithIndex().

Тогда они должны быть такими:

(0, aaa|bbb|ccc)
(1, ddd|eee|fff|ggg)

Мне нужно сделать каждую строку, связанную с идентификатором. Я могу получить СДР массива (Row), но не могу выпрыгнуть из массива.

Как мне изменить мой код?

import org.apache.spark.sql.{Row, SparkSession}

val fileRDD = spark.sparkContext.textFile(filePath)
val fileWithIdRDD = fileRDD.zipWithIndex()
// make the line like this: (0, aaa), (0, bbb), (0, ccc)
// each line is a record of Array(Row)
fileWithIdRDD.map(x => {
  val id = x._1
  val str = x._2
  val strArr = str.split("\\|")
  val rowArr = strArr.map(y => {
    Row(id, y)
  }) 
  rowArr 
})

Теперь это выглядит так:

[(0, aaa), (0, bbb), (0, ccc)]
[(1, ddd), (1, eee), (1, fff), (1, ggg)]

Но, наконец, я хочу:

(0, aaa)
(0, bbb) 
(0, ccc)
(1, ddd)
(1, eee)
(1, fff)
(1, ggg)

1 Ответ

2 голосов
/ 16 апреля 2019

Вам просто нужно сгладить RDD

yourRDD.flatMap(array => array)

С учетом вашего кода (исправлены некоторые ошибки, внутри внутренней карты и при назначении id и str)

fileWithIdRDD.map(x => {
  val id = x._1
  val str = x._2
  val strArr = str.split("\\|")
  val rowArr = strArr.map(y => {
    Row(id, y)
  }) 
  rowArr 
}).flatMap(array => array)

Быстрый пример здесь:

ВХОД

fileWithIdRDD.collect
res30: Array[(Int, String)] = Array((0,aaa|bbb|ccc), (1,ddd|eee|fff|ggg))

ОФОРМЛЕНИЕ

scala> fileWithIdRDD.map(x => {
      val id = x._1
      val str = x._2
      val strArr = str.split("\\|")
        val rowArr = strArr.map(y => {
          Row(id, y)
        })
      rowArr
      }).flatMap(array => array)


res31: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[17] at flatMap at <console>:35

OUTPUT

scala> res31.collect
res32: Array[org.apache.spark.sql.Row] = Array([0,aaa], [0,bbb], [0,ccc], [1,ddd], [1,eee], [1,fff], [1,ggg])
...