приведенный ниже подход может решить вашу проблему,
import org.apache.spark.sql.functions._
val SalespersontextDF = spark.read.text("/home/sathya/Desktop/stackoverflo/data/sales.txt")
val stringCol = SalespersontextDF.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
val processedDF = SalespersontextDF.selectExpr(s"stack(${SalespersontextDF.columns.length}, $stringCol) as (Salesperson, Customer)")
processedDF.show(false)
/*
+-----------+----------------------------------------------------------------------+
|Salesperson|Customer |
+-----------+----------------------------------------------------------------------+
|value |Salesperson_21: Customer_575,Customer_2703,Customer_2682,Customer_2615|
|value |Salesperson_11: Customer_454,Customer_158,Customer_1859,Customer_2605 |
|value |Salesperson_10: Customer_1760,Customer_613,Customer_3008,Customer_1265|
|value |Salesperson_4: Customer_1545,Customer_1312,Customer_861,Customer_2178 |
+-----------+----------------------------------------------------------------------+
*/
processedDF.withColumn("Salesperson", split($"Customer", ":").getItem(0)).withColumn("Customer", split($"Customer", ":").getItem(1)).show(false)
/*
+--------------+-------------------------------------------------------+
|Salesperson |Customer |
+--------------+-------------------------------------------------------+
|Salesperson_21| Customer_575,Customer_2703,Customer_2682,Customer_2615|
|Salesperson_11| Customer_454,Customer_158,Customer_1859,Customer_2605 |
|Salesperson_10| Customer_1760,Customer_613,Customer_3008,Customer_1265|
|Salesperson_4 | Customer_1545,Customer_1312,Customer_861,Customer_2178|
+--------------+-------------------------------------------------------+
*/