Сложный парсинг файлов в Spark 2.4 - PullRequest
0 голосов
/ 04 августа 2020

Spark с scala 2,4

Мои исходные данные выглядят так, как показано ниже.

Salesperson_21: Customer_575,Customer_2703,Customer_2682,Customer_2615
Salesperson_11: Customer_454,Customer_158,Customer_1859,Customer_2605
Salesperson_10: Customer_1760,Customer_613,Customer_3008,Customer_1265
Salesperson_4: Customer_1545,Customer_1312,Customer_861,Customer_2178

Код, используемый для сглаживания файла.

val SalespersontextDF = spark.read.text("D:/prints/sales.txt")
val stringCol = SalespersontextDF.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
    val processedDF = SalespersontextDF.selectExpr(s"stack(${df1.columns.length}, $stringCol) as (Salesperson, Customer)")

введите описание изображения здесь

К сожалению, он не заполняет продавца в правильном поле, вместо номера продавца он заполняет жестко запрограммированное значение как «значение». и номер продавца переместится в другое поле.

Большое спасибо за вашу помощь.

Ответы [ 2 ]

1 голос
/ 04 августа 2020

приведенный ниже подход может решить вашу проблему,


import org.apache.spark.sql.functions._

val SalespersontextDF = spark.read.text("/home/sathya/Desktop/stackoverflo/data/sales.txt")

val stringCol = SalespersontextDF.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")

val processedDF = SalespersontextDF.selectExpr(s"stack(${SalespersontextDF.columns.length}, $stringCol) as (Salesperson, Customer)")

processedDF.show(false)
/*
+-----------+----------------------------------------------------------------------+
|Salesperson|Customer                                                              |
+-----------+----------------------------------------------------------------------+
|value      |Salesperson_21: Customer_575,Customer_2703,Customer_2682,Customer_2615|
|value      |Salesperson_11: Customer_454,Customer_158,Customer_1859,Customer_2605 |
|value      |Salesperson_10: Customer_1760,Customer_613,Customer_3008,Customer_1265|
|value      |Salesperson_4: Customer_1545,Customer_1312,Customer_861,Customer_2178 |
+-----------+----------------------------------------------------------------------+
*/

processedDF.withColumn("Salesperson", split($"Customer", ":").getItem(0)).withColumn("Customer", split($"Customer", ":").getItem(1)).show(false)
/*
+--------------+-------------------------------------------------------+
|Salesperson   |Customer                                               |
+--------------+-------------------------------------------------------+
|Salesperson_21| Customer_575,Customer_2703,Customer_2682,Customer_2615|
|Salesperson_11| Customer_454,Customer_158,Customer_1859,Customer_2605 |
|Salesperson_10| Customer_1760,Customer_613,Customer_3008,Customer_1265|
|Salesperson_4 | Customer_1545,Customer_1312,Customer_861,Customer_2178|
+--------------+-------------------------------------------------------+
*/
1 голос
/ 04 августа 2020

попробуйте это -

spark.read
        .schema("Salesperson STRING, Customer STRING")
      .option("sep", ":")
      .csv("D:/prints/sales.txt")
...