У меня есть файл с некоторыми записями.
1,1,957,1,299.98,299.98
2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99
5,4,897,2,49.98,24.99
6,4,365,5,299.95,59.99
7,4,502,3,150.0,50.0
8,4,1014,4,199.92,49.98
9,5,957,1,299.98,299.98
10,5,365,5,299.95,59.99
Я создал парный RDD, как показано ниже.
val orderItemsRDD = sc.textFile("/shiva/spark/input/data/retail_db/order_items")
val orderItemsPairedRDD = orderItemsRDD.map(order => {val x = order.split(","); (x(1).toInt,x(4).toFloat)})
выходной парный RDD имеет тип RDD [(Int, Float)], как показано ниже.
orderItemsPairedRDD: org.apache.spark.rdd.RDD[(Int, Float)] = MapPartitionsRDD[120] at map at <console>:26
Теперь я использую ReduByKey для получения суммы.
val orderItemsRBKRDD = orderItemsPairedRDD.reduceByKey((total, rev) => total + rev)
Но неожиданно вывод имеет тип RDD [(Int, String)]
orderItemsRBKRDD: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[121] at reduceByKey at <console>:34
Если я проверяю выходные данные, это выглядит так.
scala> orderItemsRBKRDD.take(10).foreach(println)
(41234,102921,41234,249,2,109.94,54.97)
(65722,164249,65722,365,2,119.98,59.99164250,65722,730,5,400.0,80.0164251,65722,1004,1,399.98,399.98164252,65722,627,5,199.95,39.99164253,65722,191,2,199.98,99.99)
(28730,71921,28730,365,5,299.95,59.9971922,28730,502,1,50.0,50.0)
(68522,171323,68522,127,1,329.99,329.99)
(23776,59498,23776,1073,1,199.99,199.9959499,23776,403,1,129.99,129.99)
(32676,81749,32676,365,1,59.99,59.9981750,32676,627,4,159.96,39.9981751,32676,191,3,299.97,99.9981752,32676,1073,1,199.99,199.99)
(53926,134834,53926,365,2,119.98,59.99134835,53926,191,1,99.99,99.99)
(4926,12324,4926,1014,4,199.92,49.9812325,4926,1073,1,199.99,199.9912326,4926,365,4,239.96,59.9912327,4926,957,1,299.98,299.98)
(38926,97183,38926,191,5,499.95,99.9997184,38926,502,5,250.0,50.097185,38926,365,5,299.95,59.99)
(29270,73214,29270,365,5,299.95,59.9973215,29270,365,2,119.98,59.9973216,29270,1004,1,399.98,399.9873217,29270,627,4,159.96,39.9973218,29270,1004,1,399.98,399.98)
Я изменил свой код, как показано ниже.
scala> val orderItemsRBKRDD = orderItemsPairedRDD.reduceByKey((total, revenue) => total + revenue)
теперь вывод похож на приведенный ниже и соответствует ожидаемому (правильному).
orderItemsRBKRDD: org.apache.spark.rdd.RDD[(Int, Float)] = ShuffledRDD[123] at reduceByKey at <console>:28
и данные
scala> orderItemsRBKRDD.take(10).foreach(println)
(41234,109.94)
(65722,1319.8899)
(28730,349.95)
(68522,329.99)
(23776,329.98)
(32676,719.91003)
(53926,219.97)
(4926,939.85)
(38926,1049.9)
(29270,1379.8501)
единственное отличие или изменение заключается в том, что имя переменной изменяется с "rev" на "Выручка ".
Является ли " rev " зарезервированным словом в искре или скале?