Как найти максимальное и минимальное значения одновременно, используя агрегат по ключу в искре? - PullRequest
0 голосов
/ 21 февраля 2020

Я попробовал этот код, чтобы узнать, но я получил ошибку:

val keysWithValuesList = Array("1=2000", "2=1800", "2=3000", "3=2500", "4=1500")
val data = sc.parallelize(keysWithValuesList,2)
val kv = data.map(_.split("=")).map(v => (1, v(1).toInt))
val initialCount = kv.first._2
val maxi = (x: Int, y: Int) => if (x>y) x else y 
val mini = (x: Int, y: Int) => if (x>y) y else x 
val maxP = (p1: Int, p2: Int) => if (p1>p2) p1 else p2
val minP = (p1: Int, p2: Int) => if (p1>p2) p2 else p1
val max_min = kv.aggregateByKey(initialCount)((maxi,mini),(maxP,minP))

ошибка: -

command-2654386024166474:13: error: type mismatch;
 found   : ((Int, Int) => Int, (Int, Int) => Int)
 required: (Int, Int) => Int
val max_min = kv.aggregateByKey(initialCount)((maxi,mini),(maxP,minP))
                                              ^
command-2654386024166474:13: error: type mismatch;
 found   : ((Int, Int) => Int, (Int, Int) => Int)
 required: (Int, Int) => Int
val max_min = kv.aggregateByKey(initialCount)((maxi,mini),(maxP,minP))

Есть ли другой способ? Пожалуйста, предложите

Ответы [ 2 ]

0 голосов
/ 21 февраля 2020

Я нашел свое решение: -

val list = Array("1=2000", "2=1800", "2=500", "3=2500", "4=4500")
val data = sc.parallelize(list,6)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (1, v(1).toInt))
val initialCount = (kv.first._2, kv.first._2)
val min_max = (x:(Int,Int),y:Int) => {(if (x._1>y) x._1 else y, if(x._2>y) y else x._2)} 
val min_maxP=(p1:(Int,Int),p2:(Int,Int)) => {(if (p1._1>p2._1) p1._1 else p2._1, if (p1._2>p2._2) p2._2 else p1._2)}
val minimum = kv.aggregateByKey(initialCount)(min_max,min_maxP)
minimum.first._2

Вывод: -

list: Array[String] = Array(1=2000, 2=1800, 2=500, 3=2500, 4=4500)
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[164] at parallelize at command-110260081440638:2
kv: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[166] at map at command-110260081440638:4
initialCount: (Int, Int) = (2000,2000)
min_max: ((Int, Int), Int) => (Int, Int) = <function2>
min_maxP: ((Int, Int), (Int, Int)) => (Int, Int) = <function2>
minimum: org.apache.spark.rdd.RDD[(Int, (Int, Int))] = ShuffledRDD[167] at aggregateByKey at command-110260081440638:8
res29: (Int, Int) = (4500,500)
0 голосов
/ 21 февраля 2020

Можно выполнять две операции сокращения одновременно, но вам нужно будет использовать кортежи. Сначала отформатируйте ваш RDD, чтобы дублировать значение:

val rddMinMax = kv.map(x => (x._1, (x._2, x._2)))

Затем используйте эту функцию, чтобы уменьшить вдвое для каждой пары:

val minAndMax = ((l1: (Int, Int), l2: (Int, Int)) => (mini(l1._1, l2._1), maxi(l1._2, l2._2)))
rddMinMax.reduceByKey(minAndMax).collect()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...