Scala функция сортировки и сбора - PullRequest
0 голосов
/ 02 мая 2020

Я новичок в Spark в Scala. Поэтому я пишу программу, в которой я читаю CSV-файл, а затем подсчитываю общие расходы на определенный идентификационный номер. Поэтому после подсчета расходов, когда я сортирую RDD с помощью sortByKey (), он не сортирует RDD должным образом, а после применения collect () печатает надлежащим образом.

Перед сбором ()

(0,5524.9497)
(51,4975.2197)
(1,4958.5996)
(52,5245.0605)
(2,5994.591)
(53,4945.3)
(3,4659.63)
(4,4815.05)
(5,4561.0703)
(6,5397.8794)
(7,4755.0693)
(8,5517.24)
(9,5322.6494)
(10,4819.6997)```

**After Collect**

```(0,5524.9497)
(1,4958.5996)
(2,5994.591)
(3,4659.63)
(4,4815.05)
(5,4561.0703)
(6,5397.8794)
(7,4755.0693)
(8,5517.24)
(9,5322.6494)
(10,4819.6997) ```


**Code**

 ``` def main(args: Array[String])= {

    Logger.getLogger("org").setLevel(Level.ERROR)   //Set for displaying errors in the program if any

    val sc = new SparkContext("local[*]", "CustomerSpending")

    val lines = sc.textFile("../customer-orders.csv")

    val field = lines.map(x => (x.split(",")(0).toInt, x.split(",")(2).toFloat))

    val collectThemAll = field.reduceByKey((x,y) => x+y)

    val sorted = collectThemAll.sortByKey().collect()

    sorted.foreach(println)



  }

}

1 Ответ

0 голосов
/ 02 мая 2020

Spark применяет преобразования лениво , т.е. только когда вы вызываете действие типа collect или take et c. Таким образом, ваш звонок на sortByKey() применяется только после того, как вы позвоните на collect.

. Я создал приложение на основе ваших образцов данных. Я напечатал зависимость RDD, используя toDebugString, чтобы вы могли получить представление о том, что происходит за кулисами.

Приложение

import org.apache.spark.sql.SparkSession

object PlaygroundApp extends App {

  val spark = SparkSession
    .builder()
    .appName("Stackoverflow App")
    .master("local[*]")
    .getOrCreate()

  val sc = spark.sparkContext

  val lines = sc.parallelize(Seq(
    (0, 5524.9497),
    (51, 4975.2197),
    (1, 4958.5996),
    (52, 5245.0605),
    (2, 5994.591),
    (53, 4945.3),
    (9, 5322.6494),
    (10, 4819.6997))
  )

  val collectThemAll = lines.reduceByKey((x, y) => x + y)

  println("---Before sort")

  collectThemAll.foreach(println)
  println(collectThemAll.toDebugString)
  println()

  println("---After sort")  
  val sorted = collectThemAll.sortByKey()
  sorted.collect().foreach(println)
  println(sorted.toDebugString)
}

Вывод

---Before sort
(2,5994.591)
(53,4945.3)
(0,5524.9497)
(52,5245.0605)
(10,4819.6997)
(9,5322.6494)
(1,4958.5996)
(51,4975.2197)
(12) ShuffledRDD[1] at reduceByKey at PlaygroundApp.scala:28 []
 +-(12) ParallelCollectionRDD[0] at parallelize at PlaygroundApp.scala:17 []

---After sort
(0,5524.9497)
(1,4958.5996)
(2,5994.591)
(9,5322.6494)
(10,4819.6997)
(51,4975.2197)
(52,5245.0605)
(53,4945.3)
(8) ShuffledRDD[4] at sortByKey at PlaygroundApp.scala:37 []
 +-(12) ShuffledRDD[1] at reduceByKey at PlaygroundApp.scala:28 []
    +-(12) ParallelCollectionRDD[0] at parallelize at PlaygroundApp.scala:17 []
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...