Как сравнить 2 столбца и объединить в Scala - PullRequest
3 голосов
/ 09 июня 2019

Это мой текстовый файл, входящий в программу:

Id       Title Copy
B2002010 gyh   1
D2001001 abc   12
M2003005 zxc   3
D2002003 qwe   13
M2001002 efg   1
D2001004 asd   6
D2003005 zxc   3
M2001006 wer   6
D2001006 wer   6
B2004008 sxc   10
D2002007 sdf   9
D2004008 sxc   10

ID отформатирован как Xyyyyrrr, где:

  • X is B => Книга или M => Журнал
  • yyyy это год
  • rrr - случайное число.

Что мне нужно сделать, так это: получить общее количество копий книг или журналов того же года. Плюс небольшая очистка данных для столбца «копия», если я найду что-то отличное от числа, я заменю его на «0».

Мой проект Spark находится на Eclipse, и я использую Maven и Scala IDE Мне нужно использовать функцию MapReduce.

Я запустил функцию Map, которая разбивает текстовый файл.

Это код, который я начал:

package bd.spark_app

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
import scala.io.Source
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
import org.apache.log4j._
import org.apache.spark.sql.types.{StructType, StructField, StringType}
import org.apache.spark.sql.Row
import scala.Array

object alla {
  def main(args:Array[String]) = {
    val conf = new SparkConf().setMaster("local").setAppName("trying")
    val sc = new SparkContext(conf)
    val x = sc.textFile("/home/hadoopusr/sampledata")

    x.map(_.split(" ")).foreach(r => 
      println(r(0).dropRight(3), r(2))
    )

    sc.stop()
  }
} 

Это мой результат для функции Map, которую я показал выше

(B2002,1)
(D2001,12)
(M2003,3)
(D2002,13)
(M2001,1)
(D2001,6)
(D2003,3)
(M2001,6)
(D2001,6)
(B2004,10)
(D2002,9)
(D2004,10)
(M2004,11)
(D2004,11)

Мне просто нужна какая-то функция сокращения, которая будет собирать все книги и журналы за один год и складывать количество копий вместе и проверять, чтобы в столбце "копия" были числа

Пример: с записями (B2002,12) и (B2002,16) результат должен быть (B2002,28).

1 Ответ

2 голосов
/ 09 июня 2019

Можно использовать метод «reduByKey»:

val converted = x.map(_.split(" ")).map(r => (r(0).dropRight(3), r(2).toInt))
val result = converted.reduceByKey(_ + _)

Выход:

(M2001,7)
(D2001,24)
(M2003,3)
(D2003,3)
(D2002,22)
(D2004,10)
(B2002,1)
(B2004,10)

Примечание: похоже на то, что входной файл имеет формат "csv", и лучше использовать "spark.read.csv" для чтения данных и работать с DataFrame вместо RDD.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...