Это мой текстовый файл, входящий в программу:
Id Title Copy
B2002010 gyh 1
D2001001 abc 12
M2003005 zxc 3
D2002003 qwe 13
M2001002 efg 1
D2001004 asd 6
D2003005 zxc 3
M2001006 wer 6
D2001006 wer 6
B2004008 sxc 10
D2002007 sdf 9
D2004008 sxc 10
ID отформатирован как Xyyyyrrr
, где:
X
is B
=> Книга или M
=> Журнал
yyyy
это год
rrr
- случайное число.
Что мне нужно сделать, так это: получить общее количество копий книг или журналов того же года. Плюс небольшая очистка данных для столбца «копия», если я найду что-то отличное от числа, я заменю его на «0».
Мой проект Spark находится на Eclipse, и я использую Maven и Scala IDE
Мне нужно использовать функцию MapReduce.
Я запустил функцию Map, которая разбивает текстовый файл.
Это код, который я начал:
package bd.spark_app
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import scala.io.Source
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
import org.apache.log4j._
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.Row
import scala.Array
object alla {
def main(args:Array[String]) = {
val conf = new SparkConf().setMaster("local").setAppName("trying")
val sc = new SparkContext(conf)
val x = sc.textFile("/home/hadoopusr/sampledata")
x.map(_.split(" ")).foreach(r =>
println(r(0).dropRight(3), r(2))
)
sc.stop()
}
}
Это мой результат для функции Map, которую я показал выше
(B2002,1)
(D2001,12)
(M2003,3)
(D2002,13)
(M2001,1)
(D2001,6)
(D2003,3)
(M2001,6)
(D2001,6)
(B2004,10)
(D2002,9)
(D2004,10)
(M2004,11)
(D2004,11)
Мне просто нужна какая-то функция сокращения, которая будет собирать все книги и журналы за один год и складывать количество копий вместе и проверять, чтобы в столбце "копия" были числа
Пример: с записями (B2002,12)
и (B2002,16)
результат должен быть (B2002,28)
.