Сортировка списка значений в СДР в Scala - PullRequest
0 голосов
/ 21 мая 2019

У меня есть RDD, в котором мой ключ является идентификатором, а значения включают список идентификаторов.Я хочу отсортировать список значений в порядке возрастания. Например,

1, list(12,3,8,10)
2, list(42,3,65,33)
3, list(6,2,4,1)

Вывод

1, list(3,8,10,12)
2, list(3,33,42,65)
3, list(1,2,4,6)

Создание RDD Таким образом, я создал RDD после соединения двух разных RDDа затем использовал его productIterator для создания list of values, что дает мне СДР типа RDD(Int, List[Any])

Я пробовал rdd.mapValues(x=> _.2.sorted) различные методы сортировки, но не повезло

1 Ответ

2 голосов
/ 21 мая 2019

Ты почти у цели.

mapValues, как следует из названия, применяет вашу функцию отображения только к значениям.Ваш код выглядит так, как будто вы пытаетесь извлечь второй элемент из кортежа ключ / значение, который, я полагаю, выдает ошибки.

Вы можете использовать map или mapValues.map необходимо вернуть кортеж, если вы хотите сохранить свой ключ, поэтому mapValues проще, но я покажу вам оба пути.Итак, мы начинаем с RDD[(Int, List[Int])], который я уже построил, и используем collect() для его просмотра.

scala> start
res17: org.apache.spark.rdd.RDD[(Int, List[Int])] = MapPartitionsRDD[6] at map at <console>:37

scala> start.collect()
res18: Array[(Int, List[Int])] = Array((1,List(12, 3, 8, 10)), (2,List(42, 3, 65, 33)))

Во-первых, давайте сделаем простейшую вещь:

scala> start.mapValues(x => x.sorted).collect()
res19: Array[(Int, List[Int])] = Array((1,List(3, 8, 10, 12)), (2,List(3, 33, 42, 65)))

Как видите, он возвращает ожидаемый порядок.

Использовать map для изменения кортежа ключ / значение довольно просто, если вы сохраняете ключ.Я рекомендую использовать синтаксис case-функции Scala для разбиения кортежа на именованные аргументы вместо того, чтобы ссылаться на tuple._1 / tuple._2

scala> start.map({ case (k, v) => (k, v.sorted) }).collect()
res21: Array[(Int, List[Int])] = Array((1,List(3, 8, 10, 12)), (2,List(3, 33, 42, 65)))

Но используя синтаксис кортежа, с которым вы знакомы:

scala> start.map(x => (x._1, x._2.sorted)).collect()
res22: Array[(Int, List[Int])] = Array((1,List(3, 8, 10, 12)), (2,List(3, 33, 42, 65)))

Надеюсь, это поможет. Редактировать так как похоже, что ваша проблема связана с отсутствием информации о типе, я добавил, как я создал свой RDD, который я использовал для выполнения сценариев.

val input: Array[Array[Int]] = Array(Array(1, 12, 3, 8, 10), Array(2, 42, 3, 65, 33))

val start: RDD[(Int, List(Int)] = sc.parallelize(input).map({ 
  case Array(key, value @ _*) => (key, value.toList)
})

Если выпосмотрите на сигнатуру метода для :A](implicitord:scala.math.Ordering[B]):Repr" rel="nofollow noreferrer"> List.sorted , вы увидите, что он имеет неявный параметр, который сообщает Scala, как сортировать список.

Scala предоставляет реализации по умолчанию для таких вещей, как числа и строки,но он находит неявную реализацию в зависимости от типа вашего списка.Он не имеет значения по умолчанию для списка Any, который эквивалентен списку Object в Java.Поэтому, если вы сможете изменить свой вопрос, добавив в него больше кода, это поможет определить, где вы теряете информацию этого типа.

...