Значение столбца не выравнивается должным образом после разворота-разворачивания в Spark 2.4 - PullRequest
0 голосов
/ 04 августа 2020

Первый фрейм данных Spark 2.4 выглядит так:

Salesperson_21: Customer_575,Customer_2703,Customer_2682,Customer_2615
Salesperson_11: Customer_454,Customer_158,Customer_1859,Customer_2605
Salesperson_10: Customer_1760,Customer_613,Customer_3008,Customer_1265
Salesperson_4: Customer_1545,Customer_1312,Customer_861,Customer_2178

Второй фрейм данных выглядит как на изображении.

enter image description here

Code:

val df1 = spark.read.schema("Salesperson STRING, Customer STRING")
  .option("sep", ":")
  .csv("D:/New folder/prints/salesperson.txt".split(System.lineSeparator()).toSeq.toDS())
df1.show(false)
df1.printSchema()

val df2 = spark.read
  .option("sep", ",")
  .option("inferSchema", "true")
  .option("header", "true")
  .option("nullValue", "null")
  .csv("D:/New folder/prints/Type.csv")
df2.show(false)
df2.printSchema()

Here is the problem. when i am printing schema i am getting like. enter image description here

val processedDF = df1.withColumn("Customer", explode(split(trim(col("Customer")), ",")))
processedDF.show(false)

processedDF.join(df2, Seq("Customer"), "left")
  .groupBy("Customer")
  .agg(count("Type").as("Occurance"), first("Salesperson").as("Salesperson"))
  .show(false)

enter image description here

Expected Doutput

введите описание изображения здесь

Ожидаемый результат, чтобы привлечь всех клиентов со своим продавцом из фрейма данных1 и сколько раз клиент присутствовал (называемый возникновением) во фрейме данных 2. Я добавил снимок изображения для большей наглядности. Пожалуйста, помогите мне.

1 Ответ

1 голос
/ 04 августа 2020

Вам не нужно stack здесь -

  1. читать текстовый файл с помощью разделителя :
  2. split и explode клиенты
  3. Left Join с помощью df2
  4. Вычислить вхождение

Загрузить данные 1. Прочтите текстовый файл с помощью разделителя :

 val data1 =
      """
        |Salesperson_21: Customer_575,Customer_2703,Customer_2682,Customer_2615
        |Salesperson_11: Customer_454,Customer_158,Customer_1859,Customer_2605
        |Salesperson_10: Customer_1760,Customer_613,Customer_3008,Customer_1265
        |Salesperson_4: Customer_1545,Customer_1312,Customer_861,Customer_2178
      """.stripMargin
//    val stringDS1 = data1.split(System.lineSeparator())
//      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
//      .toSeq.toDS()
    val df1 = spark.read.schema("Salesperson STRING, Customer STRING")
      .option("sep", ":")
      .csv(data1.split(System.lineSeparator()).toSeq.toDS())
    df1.show(false)
    df1.printSchema()
    /**
      * +--------------+-------------------------------------------------------+
      * |Salesperson   |Customer                                               |
      * +--------------+-------------------------------------------------------+
      * |Salesperson_21| Customer_575,Customer_2703,Customer_2682,Customer_2615|
      * |Salesperson_11| Customer_454,Customer_158,Customer_1859,Customer_2605 |
      * |Salesperson_10| Customer_1760,Customer_613,Customer_3008,Customer_1265|
      * |Salesperson_4 | Customer_1545,Customer_1312,Customer_861,Customer_2178|
      * +--------------+-------------------------------------------------------+
      *
      * root
      * |-- Salesperson: string (nullable = true)
      * |-- Customer: string (nullable = true)
      */


    val data2 =
      """
        |Type  |Customer
        |shop  |Customer_17
        |Home  |Customer_2703
        |shop  |Customer_2703
        |Home  |Customer_575
        |Shop  |Customer_202
      """.stripMargin
    val stringDS2 = data2.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df2 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS2)
    df2.show(false)
    df2.printSchema()
    /**
      * +----+-------------+
      * |Type|Customer     |
      * +----+-------------+
      * |shop|Customer_17  |
      * |Home|Customer_2703|
      * |shop|Customer_2703|
      * |Home|Customer_575 |
      * |Shop|Customer_202 |
      * +----+-------------+
      *
      * root
      * |-- Type: string (nullable = true)
      * |-- Customer: string (nullable = true)
      */

2. split и explode клиенты


    val processedDF = df1.withColumn("Customer", explode(split(trim(col("Customer")), ",")))
     processedDF.show(false)
    /**
      * +--------------+-------------+
      * |Salesperson   |Customer     |
      * +--------------+-------------+
      * |Salesperson_21|Customer_575 |
      * |Salesperson_21|Customer_2703|
      * |Salesperson_21|Customer_2682|
      * |Salesperson_21|Customer_2615|
      * |Salesperson_11|Customer_454 |
      * |Salesperson_11|Customer_158 |
      * |Salesperson_11|Customer_1859|
      * |Salesperson_11|Customer_2605|
      * |Salesperson_10|Customer_1760|
      * |Salesperson_10|Customer_613 |
      * |Salesperson_10|Customer_3008|
      * |Salesperson_10|Customer_1265|
      * |Salesperson_4 |Customer_1545|
      * |Salesperson_4 |Customer_1312|
      * |Salesperson_4 |Customer_861 |
      * |Salesperson_4 |Customer_2178|
      * +--------------+-------------+
      */

3. Left Join с df2 & 4. Вычислить вхождение


    processedDF.join(df2, Seq("Customer"), "left")
      .groupBy("Customer")
      .agg(count("Type").as("Occurance"), first("Salesperson").as("Salesperson"))
      .show(false)

    /**
      * +-------------+---------+--------------+
      * |Customer     |Occurance|Salesperson   |
      * +-------------+---------+--------------+
      * |Customer_1312|0        |Salesperson_4 |
      * |Customer_1545|0        |Salesperson_4 |
      * |Customer_1760|0        |Salesperson_10|
      * |Customer_2682|0        |Salesperson_21|
      * |Customer_2703|2        |Salesperson_21|
      * |Customer_3008|0        |Salesperson_10|
      * |Customer_454 |0        |Salesperson_11|
      * |Customer_613 |0        |Salesperson_10|
      * |Customer_1265|0        |Salesperson_10|
      * |Customer_158 |0        |Salesperson_11|
      * |Customer_1859|0        |Salesperson_11|
      * |Customer_2178|0        |Salesperson_4 |
      * |Customer_2605|0        |Salesperson_11|
      * |Customer_2615|0        |Salesperson_21|
      * |Customer_575 |1        |Salesperson_21|
      * |Customer_861 |0        |Salesperson_4 |
      * +-------------+---------+--------------+
      */
...