Spark: как сгруппировать один столбец элементов на основе двух других столбцов элементов в СДР - PullRequest
0 голосов
/ 05 марта 2019

У меня есть RDD с 3 столбцами (road_idx, snodeidx, enodeidx).Выглядит это так:

(roadidx_995, 1138, 1145)
(roadidx_996, 1138, 1139)
(roadidx_997, 2740, 1020)
(roadidx_998, 2762, 2740)
(roadidx_999, 3251, 3240)
.........

Как сгруппировать road_idx, которые имеют один из общих snodeidx или enodeidx?Дайте каждой группе номер, начинающийся с 1.

ожидаемый результат:

(1,[roadidx_995,roadidx_996])
(2,[roadidx_997,roadidx_998])
(3,[roadidx_999])

, как показано выше,

roadidx_995 и roadidx_996 имеют одинаковые snodeidx 1138.

roadidx_997 имеет snodeidx , такой же, как enodeidx roadidx_998, который равен 2740.

roadidx_999 входит в группу самостоятельно.

Код Scala или код Python в порядке.Пока вы можете рассказать мне логику использования API RDD для получения ожидаемого результата.

Очень признателен!

1 Ответ

0 голосов
/ 05 марта 2019

Может быть реализовано как:

  1. Разделить оригинал на два rdd - сгруппированные по узлам "start" и "end".
  2. Объединить набор исходных данных со значениями от 1) несколько раз и получим четыре столбца, например:

    |------------------|----------------|--------------|----------------|
    | start join start | start join end | end join end | end join start |
    |------------------|----------------|--------------|----------------|
    
  3. Объединить значения из четырех столбцов в один

В Scala можно реализовать:

val data = List(
  ("roadidx_995", 1138, 1145),
  ("roadidx_996", 1138, 1139),
  ("roadidx_997", 2740, 1020),
  ("roadidx_998", 2762, 2740),
  ("roadidx_999", 3251, 3240)
)
val original = sparkContext.parallelize(data)

val groupedByStart = original.map(v => (v._1, v._2)).groupBy(_._2).mapValues(_.map(_._1))
val groupedByEnd = original.map(v => (v._1, v._3)).groupBy(_._2).mapValues(_.map(_._1))
val indexesOnly = original.map(allRow => (allRow._2, allRow._3))

// join by start value
val startJoinsStart = indexesOnly.keyBy(_._1).join(groupedByStart)
val startJoinsEnd = startJoinsStart.leftOuterJoin(groupedByEnd)

// join by end value
val endKeys = startJoinsEnd.values.keyBy(_._1._1._2)

val endJoinsEnd = endKeys.join(groupedByEnd)
val endJoinsStart = endJoinsEnd.leftOuterJoin(groupedByStart)

// flatten to output format
val result = endJoinsStart
  .values
  .map(v => (v._1._1._1._2, v._1._1._2, v._1._2, v._2))
  .map(v => v._1 ++ v._2.getOrElse(Seq()) ++ v._3 ++ v._4.getOrElse(Seq()))
  .map(_.toSet)
  .distinct()

result.foreach(println)

Вывод:

Set(roadidx_995, roadidx_996)
Set(roadidx_998, roadidx_997)
Set(roadidx_999)
...