Вложенная карта Scala в Spark RDD - PullRequest
0 голосов
/ 24 мая 2019

Я пытаюсь преобразовать список карт (Seq [Map [String, Map [String, String]]) в таблицу / кортеж RDD, где каждая пара ключ -> значение на карте плоско отображается в кортежс ключом внешней карты.Например,

Map(
 1 -> Map('k' -> 'v', 'k1' -> 'v1')
)  

становится

(1, 'k', 'v')
(1, 'k1', 'v1')

Я пробовал следующий подход, но, похоже, он не работает в случае проблем параллелизма.У меня есть два рабочих узла, и он дублирует ключ -> значение дважды (что я предполагаю, потому что я делаю это неправильно)

Допустим, я держу свой тип карты в классе дела 'Записи'

  val rdd = sc.parallelize(1 to records.length)
    val recordsIt = records.iterator
      val res: RDD[(String, String, String)] = rdd.flatMap(f => {
        val currItem = recordsIt.next()
        val x: immutable.Iterable[(String, String, String)] = currItem.mapData.map(v => {
          (currItem.identifier, v._1, v._2)
        })
        x
      }).sortBy(r => r)

Есть ли способ распараллелить эту работу, не сталкиваясь с серьезными проблемами параллелизма (как я подозреваю, происходит?)

пример дублированного вывода

(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,CID,B13131608623827542)
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,CID,B13131608623827542)
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,ROD,19190321)
(201905_001ac172c2751c1d4f4b4cb0affb42ef_gFF0dSg4iw,ROD,19190321)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,CID,339B4C3C03DDF96AAD)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,CID,339B4C3C03DDF96AAD)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,ROD,19860115)
(201905_001b3ba44f6d1f7505a99e2288108418_mSfAfo31f8,ROD,19860115)

1 Ответ

2 голосов
/ 24 мая 2019

Spark parallelize очень эффективен с самого начала (поскольку вы уже начинаете хранить данные в памяти, гораздо дешевле просто перебирать их локально), тем не менее, более идиоматический подход будет простым flatMap:

sc.parallelize(records.toSeq)
  .flatMapValues(identity)
  .map { case (k1, (k2, v)) => (k1, k2, v) } 
...