Имя столбца внутри столбца фрейма данных в искре с scala - PullRequest
0 голосов
/ 02 августа 2020

введите описание изображения здесь Я использую искру с Scala. 2.4.3

Мой фрейм данных продавца выглядит так: у него всего 54 продавец, я взял пример только из 3 столбцов

Schema of SalesPerson table.
root
 |-- col: struct (nullable = false)
 |    |-- SalesPerson_1: string (nullable = true)
 |    |-- SalesPerson_2: string (nullable = true)
 |    |-- SalesPerson_3: string (nullable = true)

данных представления продавца.

     SalesPerson_1|SalesPerson_2|SalesPerson_3
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++
    [Customer_1793,  Customer_202,  Customer_2461]
    [Customer_2424, Customer_130, Customer_787]
    [Customer_1061, Customer_318, Customer_706]
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Фрейм данных моей торговой площадки выглядит как

Schema of salesplace
 
 root
 |-- Place: string (nullable = true)
 |-- Customer: string (nullable = true)

Data of salesplace
Place|Customer
Online| Customer_1793
Retail| Customer_1793
Retail| Customer_130
Online| Customer_130
Online| Customer_2461
Retail| Customer_2461
Online| Customer_2461

Я пытаюсь проверить, какие клиенты из таблицы «Продавец» доступны в таблице SalesPlace. с двумя additional column shows customer belong to salesperson

и количеством появлений клиентов в таблице SalesPlace, для

Ожидаемый результат:

CustomerBelongstoSalesperson|Customer     |occurance|
SalesPerson_1               |Customer_1793|2
SalesPerson_2               |Customer_130 |2 
SalesPerson_3               |Customer_2461|3
SalesPerson_2               |Customer_202 |0
SalesPerson_1               |Customer_2424|0
SalesPerson_1               |Customer_1061|0
SalesPerson_2               |Customer_318 |0
SalesPerson_3               |Customer_787 |0

Код:

Error:
The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF expected 54 aliases but got Salesperson,Customer ;

В искре это кажется немного критичным. Я не уверен, можно ли указать имя столбца внутри столбца в качестве значения .... Может кто-нибудь, пожалуйста, помогите мне с идеей, как это сделать ........ Спасибо

1 Ответ

1 голос
/ 02 августа 2020

Попробуйте это -

Загрузите предоставленные тестовые данные

 val data1 =
      """
        |salesperson1          |  salesperson2
        |Customer_17         |Customer_202
        |Customer_24         |Customer_130
      """.stripMargin
    val stringDS1 = data1.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df1 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS1)
    df1.show(false)
    df1.printSchema()
    /**
      * +------------+------------+
      * |salesperson1|salesperson2|
      * +------------+------------+
      * |Customer_17 |Customer_202|
      * |Customer_24 |Customer_130|
      * +------------+------------+
      *
      * root
      * |-- salesperson1: string (nullable = true)
      * |-- salesperson2: string (nullable = true)
      */

    val data2 =
      """
        |Place  |Customer
        |shop  |Customer_17
        |Home  |Customer_17
        |shop  |Customer_17
        |Home  |Customer_130
        |Shop  |Customer_202
      """.stripMargin
    val stringDS2 = data2.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df2 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS2)
    df2.show(false)
    df2.printSchema()
    /**
      * +-----+------------+
      * |Place|Customer    |
      * +-----+------------+
      * |shop |Customer_17 |
      * |Home |Customer_17 |
      * |shop |Customer_17 |
      * |Home |Customer_130|
      * |Shop |Customer_202|
      * +-----+------------+
      *
      * root
      * |-- Place: string (nullable = true)
      * |-- Customer: string (nullable = true)
      */

Unpivot и left join

  val stringCol = df1.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
    val processedDF = df1.selectExpr(s"stack(${df1.columns.length}, $stringCol) as (Salesperson, Customer)")
    processedDF.show(false)
    /**
      * +------------+------------+
      * |Salesperson |Customer    |
      * +------------+------------+
      * |salesperson1|Customer_17 |
      * |salesperson2|Customer_202|
      * |salesperson1|Customer_24 |
      * |salesperson2|Customer_130|
      * +------------+------------+
      */

    processedDF.join(df2, Seq("Customer"), "left")
      .groupBy("Customer")
      .agg(count("Place").as("Occurance"), first("Salesperson").as("Salesperson"))
      .show(false)

    /**
      * +------------+---------+------------+
      * |Customer    |Occurance|Salesperson |
      * +------------+---------+------------+
      * |Customer_130|1        |salesperson2|
      * |Customer_17 |3        |salesperson1|
      * |Customer_202|1        |salesperson2|
      * |Customer_24 |0        |salesperson1|
      * +------------+---------+------------+
      */
...