Как получить пересечение двух СДР [(String, Iterable [String])] - PullRequest
0 голосов
/ 02 июня 2018

Данные состоят из двух столбцов

A B
A C
A D
B A
B C
B D
B E
C A
C B
C D
C E
D A
D B
D C
D E
E B
E C
E D

В первом ряду представьте, что A дружит с B и т. Д. Как мне найти их общих друзей?

(A,B) -> (C D)

То есть у A и B есть общие друзья C и D. Я подошел так же близко, как и к groupByKey, со следующим результатом.

(B,CompactBuffer(A, C, D, E))
(A,CompactBuffer(B, C, D))
(C,CompactBuffer(A, B, D, E))
(E,CompactBuffer(B, C, D))
(D,CompactBuffer(A, B, C, E))

Код:

val rdd: RDD[String] = spark.sparkContext.textFile("twocols.txt")
val splitrdd: RDD[(String, String)] = rdd.map { s =>
  var str = s.split(" ")
  new Tuple2(str(0), str(1))
}
val group: RDD[(String, Iterable[String])] = splitrdd.groupByKey()
group.foreach(println)

Ответы [ 4 ]

0 голосов
/ 02 июня 2018

Сначала swap элементы:

val swapped = splitRDD.map(_.swap)

Затем самостоятельное соединение и swap назад:

val shared =  swapped.join(swapped).map(_.swap)

Наконец, отфильтруйте дубликаты (при необходимости) и groupByKey:

shared.filter { case ((x, y), _) => x < y }.groupByKey
0 голосов
/ 02 июня 2018

Продолжая с того места, где вы остановились:

val group: RDD[(String, Iterable[String])] = splitrdd.groupByKey()
val group_map = group.collectAsMap
val common_friends = group
  .flatMap{case (x, friends) => 
    friends.map{y => 
      ((x,y),group_map.get(y).get.toSet.intersect(friends.toSet))
    }
  }

scala> common_friends.foreach(println)
((B,A),Set(C, D))
((B,C),Set(A, D, E))
((B,D),Set(A, C, E))
((B,E),Set(C, D))
((D,A),Set(B, C))
((D,B),Set(A, C, E))
((D,C),Set(A, B, E))
((D,E),Set(B, C))
((A,B),Set(C, D))
((A,C),Set(B, D))
((A,D),Set(B, C))
((C,A),Set(B, D))
((C,B),Set(A, D, E))
((C,D),Set(A, B, E))
((C,E),Set(B, D))
((E,B),Set(C, D))
((E,C),Set(B, D))
((E,D),Set(B, C))

Примечание: это предполагает, что ваши данные имеют отношение в обоих направлениях, как в вашем примере: (AB и BA).Если это не так, вам нужно добавить код, чтобы справиться с тем фактом, что group_map.get(y) может вернуть None.

0 голосов
/ 02 июня 2018

Так что я закончил делать это на стороне клиента.НЕ ДЕЛАЙТЕ ЭТОГО

val arr: Array[(String, Iterable[String])] = group.collect()
//arr.foreach(println)
var arr2 = scala.collection.mutable.Set[((String, String), List[String])]()
for (i <- arr)
  for (j <- arr)
    if (i != j) {
      val s1 = i._2.toSet
      val s2 = j._2.toSet
      val s3 = s1.intersect(s2).toList
      //println(s3)
      val pair = if (i._1 < j._1) (i._1, j._1) else (j._1, i._1)
      arr2 += ((pair, s3))
    }

arr2.foreach(println)

Результат

((B,E),List(C, D))
((A,C),List(B, D))
((A,B),List(C, D))
((A,D),List(B, C))
((B,D),List(A, C, E))
((C,D),List(A, B, E))
((B,C),List(A, D, E))
((C,E),List(B, D))
((D,E),List(B, C))
((A,E),List(B, C, D))

Мне интересно, могу ли я сделать это, используя преобразования в Spark.

0 голосов
/ 02 июня 2018

Это просто уродливая попытка:

Предположим, вы преобразовали свои два столбца в Array[Array[String]] (или List[List[String]], это действительно одно и то же), скажем

val pairs=Array(
Array("A","B"),
Array("A","C"),
Array("A","D"),
Array("B","A"),
Array("B","C"),
Array("B","D"),
Array("B","E"),
Array("C","A"),
Array("C","B"),
Array("C","D"),
Array("C","E"),
Array("D","A"),
Array("D","B"),
Array("D","C"),
Array("D","E"),
Array("E","B"),
Array("E","C"),
Array("E","D")
)

Определитегруппа, для которой вы хотите найти своих общих друзей:

val group=Array("C","D")

Следующая команда найдет друзей для каждого члена вашей группы

val friendsByMemberOfGroup=group.map(
i => pairs.filter(x=> x(1) contains i)
.map(x=>x(0))
)

Например, pairs.filter(x=>x(1) contains "C").map(x=>x(0)) возвращаетдрузья из "C", где "C" берется из второго столбца, а его друзья - из первого столбца:

scala> pairs.filter(x=> x(1) contains "C").map(x=>x(0))
res212: Array[String] = Array(A, B, D, E)

И следующий цикл найдет общих друзей всех участников в вашем столбце.group

var commonFriendsOfGroup=friendsByMemberOfGroup(0).toSet
for(i <- 1 to friendsByMemberOfGroup.size-1){
commonFriendsOfGroup=
commonFriendsOfGroup.intersect(friendsByMemberOfGroup(i).toSet)
}

Таким образом, вы получаете

scala> commonFriendsOfGroup.toArray
res228: Array[String] = Array(A, B, E)

Если вы измените свою группу на val group=Array("A","B","E") и примените предыдущие строки, то вы получите

scala> commonFriendsOfGroup.toArray
res230: Array[String] = Array(C, D)
...