Я новичок в Apache Spark, а также в Scala, в настоящее время изучаю этот фреймворк и язык программирования для больших данных.У меня есть пример файла, который я пытаюсь выяснить для данного поля общее количество другого поля и его количество и список значений из другого поля.Я попробовал самостоятельно, и кажется, что я не пишу в лучшем подходе в spark rdd
(как начало).
Пожалуйста, найдите ниже примерные данные (Customerid: Int, Orderid: Int, Amount: Float)
:
44,8602,37.19
35,5368,65.89
2,3391,40.64
47,6694,14.98
29,680,13.08
91,8900,24.59
70,3959,68.68
85,1733,28.53
53,9900,83.55
14,1505,4.32
51,3378,19.80
42,6926,57.77
2,4424,55.77
79,9291,33.17
50,3901,23.57
20,6633,6.49
15,6148,65.53
44,8331,99.19
5,3505,64.18
48,5539,32.42
Мой текущий код:
((sc.textFile("file://../customer-orders.csv").map(x => x.split(",")).map(x => (x(0).toInt,x(1).toInt)).map{case(x,y) => (x, List(y))}.reduceByKey(_ ++ _).sortBy(_._1,true)).
fullOuterJoin(sc.textFile("file://../customer-orders.csv").map(x =>x.split(",")).map(x => (x(0).toInt,x(2).toFloat)).reduceByKey((x,y) => (x + y)).sortBy(_._1,true))).
fullOuterJoin(sc.textFile("file://../customer-orders.csv").map(x =>x.split(",")).map(x => (x(0).toInt)).map(x => (x,1)).reduceByKey((x,y) => (x + y)).sortBy(_._1,true)).sortBy(_._1,true).take(50).foreach(println)
Получен такой результат:
(49,(Some((Some(List(8558, 6986, 686....)),Some(4394.5996))),Some(96)))
Ожидаемый результат как:
customerid, (orderids,..,..,....), totalamount, number of orderids
Есть ли лучший подход?Я только что попробовал combineByKey
с приведенным ниже кодом, но println
внутри не печатает.
scala> val reduced = inputrdd.combineByKey(
| (mark) => {
| println(s"Create combiner -> ${mark}")
| (mark, 1)
| },
| (acc: (Int, Int), v) => {
| println(s"""Merge value : (${acc._1} + ${v}, ${acc._2} + 1)""")
| (acc._1 + v, acc._2 + 1)
| },
| (acc1: (Int, Int), acc2: (Int, Int)) => {
| println(s"""Merge Combiner : (${acc1._1} + ${acc2._1}, ${acc1._2} + ${acc2._2})""")
| (acc1._1 + acc2._1, acc1._2 + acc2._2)
| }
| )
reduced: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[27] at combineByKey at <console>:29
scala> reduced.collect()
res5: Array[(String, (Int, Int))] = Array((maths,(110,2)), (physics,(214,3)), (english,(65,1)))
Я использую Spark версии 2.2.0, Scala 2.11.8 и Java 1.8 build 101