Сравнивать по уменьшению Scala - PullRequest
0 голосов
/ 10 июля 2019

У меня есть список пар (id- (имя-значение)). Вот так

val input =  sc.parallelize(Array(Array(1, "a 10"),
                                  Array(1, "b 11"), 
                                  Array(3, "a 12"),
                                  Array(3, "b 13"),
                                  Array(3, "c 14"),
                                  Array(4, "b 15")))


Фаза карты имеет ключ - это идентификатор, а значение - (имя-значение) строка.

val rdd = input.map(x => (x(0), x(1)))

Мой ожидаемый результат: для каждого идентификатора сравнивайте значения, основанные на имени, с функцией af ().

Например, с id == "3", мы получили результат после сокращенияфаза:

(key: ab, value: f(12,13))
(key: ac, value: f(12,14))
(key: bc, value: f(13,14))

1 Ответ

1 голос
/ 10 июля 2019

СДР может быть объединен с самим собой для получения всех пар, и только требуемая строка может быть оставлена ​​путем фильтрации:

// split string value on two parts
val rdd = input.map(x => (x(0), x(1).toString.split(" ")))
  .map({ case (key, parts) => (key, (parts(0), parts(1))) })

// join , filter, and transform to expected
val both = rdd
  .join(rdd)
  .filter({ case (_, (v1, v2)) => v1._1 < v2._1 })
  .map({ case (key, (v1, v2)) => (s"[$key] key: " + v1._1 + v2._1, s"value: f(${v1._2},${v2._2})") })

Выход:

([1] key: ab,value: f(10,11))
([3] key: ab,value: f(12,13))
([3] key: ac,value: f(12,14))
([3] key: bc,value: f(13,14))

PS: здесь можно использовать расширенную фильтрацию.

...