Как мне объединить два ряда на основе общего поля? - PullRequest
0 голосов
/ 14 апреля 2020

Я очень плохо знаком с scala и учусь работать с RDDS. У меня есть два файла CSV, которые имеют следующие заголовки и данные: csv1.txt:

id,"location", "zipcode" 
1, "a", "12345"
2, "b", "67890"
3, "c" "54321"

csv2.txt:

"location_x", "location_y", trip_hrs
"a", "b", 1
"a", "c", 3
"b", "c", 2
"a", "b", 1
"c", "b", 2

В основном, данные CSV1 представляют собой отдельный набор местоположений и почтовые индексы, тогда как данные csv2 имеют продолжительность поездки между location_x и location_y. Общей информацией в этих двух наборах данных является location в csv1 и location_x в csv 2, даже если они имеют разные имена заголовков.

Я хотел бы создать две трети, одна из которых содержит данные из csv1, а другая из csv2. Затем я хотел бы присоединиться к этим двум местам и вернуть местоположение, почтовый индекс и сумму всех поездок из этого места, как показано ниже:

("a", "zipcode", 5)
("b", "zipcode", 2)
("c", "zipcode", 2)

Мне было интересно, может ли один из вас помочь мне с этим проблема. Спасибо.

Ответы [ 2 ]

0 голосов
/ 14 апреля 2020

Если вы уже можете прочитать CSV в RDD, поездки можно суммировать, а затем объединить с местоположениями:

val tripsSummarized = trips
  .map({ case (location, _, hours) => (location, hours) })
  .reduceByKey((hoursTotal, hoursIncrement) => hoursTotal + hoursIncrement)

val result = locations
  .map({ case (_, location, zipCode) => (location, zipCode) })
  .join(tripsSummarized)
  .map({case (location, (zipCode, hoursTotal)) => (location, zipCode, hoursTotal) })

Если требуются местоположения без отключений, можно использовать «leftOuterJoin».

0 голосов
/ 14 апреля 2020

Я дам вам код (полное приложение в IntelliJ) с некоторыми пояснениями. Я надеюсь, что это может быть полезно.

Пожалуйста, прочитайте документацию по spark для подробных подробностей.

http://spark.apache.org/docs/2.3.0/rdd-programming-guide.html#working -with-key-value-pair

Эту проблему можно решить с помощью Spark Dataframes, вы можете попробовать сами.

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object Joining {

  val spark = SparkSession
    .builder()
    .appName("Joining")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id", "Joining")  // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext

  val path = "/home/cloudera/files/tests/"

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    try {

      // read the files
      val file1 = sc.textFile(s"${path}join1.csv")
      val header1 = file1.first // extract the header of the file
      val file2 = sc.textFile(s"${path}join2.csv")
      val header2 = file2.first // extract the header of the file

      val rdd1 = file1
        .filter(line => line != header1) // to leave out the header
        .map(line => line.split(",")) // split the lines => Array[String]
        .map(arr => (arr(1).trim,arr(2).trim)) // to make up a pairRDD with arr(1) as key and zipcode

      val rdd2 = file2
          .filter(line => line != header2)
          .map(line => line.split(",")) // split the lines => Array[String]
          .map(arr => (arr(0).trim, arr(2).trim.toInt)) // to make up a pairRDD with arr(0) as key and trip_hrs

      val joined = rdd1 // join the pairRDD by its keys
          .join(rdd2)
          .cache()  // cache joined in memory

      joined.foreach(println) // checking data
      println("**************")

//      ("c",("54321",2))
//      ("b",("67890",2))
//      ("a",("12345",1))
//      ("a",("12345",3))
//      ("a",("12345",1))

      val result = joined.reduceByKey({ case((zip, time), (zip1, time1) ) => (zip, time + time1) })

      result.map({case( (id,(zip,time)) ) => (id, zip, time)}).foreach(println) // checking output

//      ("b","67890",2)
//      ("c","54321",2)
//      ("a","12345",5)

      // To have the opportunity to view the web console of Spark: http://localhost:4041/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

С уважением.

...