Я воспользовался транспонированием, чтобы сжать все последовательности по позициям, а затем сделал позэксплод.Выборки на фреймах данных являются динамическими для удовлетворения условия: Количество столбцов и длина значения будут варьироваться в вопросе.
import org.apache.spark.sql.functions._
val df = Seq(
("2019,2018,2017", "100,200,300", "IN,PRE,POST"),
("2018", "73", "IN"),
("2018,2017", "56,89", "IN,PRE")
).toDF("Date", "Amount", "Status")
df: org.apache.spark.sql.DataFrame = [Date: string, Amount: string ... 1 more field]
scala> df.show(false)
+--------------+-----------+-----------+
|Date |Amount |Status |
+--------------+-----------+-----------+
|2019,2018,2017|100,200,300|IN,PRE,POST|
|2018 |73 |IN |
|2018,2017 |56,89 |IN,PRE |
+--------------+-----------+-----------+
scala> def transposeSeqOfSeq[S](x:Seq[Seq[S]]): Seq[Seq[S]] = { x.transpose }
transposeSeqOfSeq: [S](x: Seq[Seq[S]])Seq[Seq[S]]
scala> val myUdf = udf { transposeSeqOfSeq[String] _}
myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(ArrayType(StringType,true),true),Some(List(ArrayType(ArrayType(StringType,true),true))))
scala> val df2 = df.select(df.columns.map(c => split(col(c), ",") as c): _*)
df2: org.apache.spark.sql.DataFrame = [Date: array<string>, Amount: array<string> ... 1 more field]
scala> df2.show(false)
+------------------+---------------+---------------+
|Date |Amount |Status |
+------------------+---------------+---------------+
|[2019, 2018, 2017]|[100, 200, 300]|[IN, PRE, POST]|
|[2018] |[73] |[IN] |
|[2018, 2017] |[56, 89] |[IN, PRE] |
+------------------+---------------+---------------+
scala> val df3 = df2.withColumn("allcols", array(df.columns.map(c => col(c)): _*))
df3: org.apache.spark.sql.DataFrame = [Date: array<string>, Amount: array<string> ... 2 more fields]
scala> df3.show(false)
+------------------+---------------+---------------+------------------------------------------------------+
|Date |Amount |Status |allcols |
+------------------+---------------+---------------+------------------------------------------------------+
|[2019, 2018, 2017]|[100, 200, 300]|[IN, PRE, POST]|[[2019, 2018, 2017], [100, 200, 300], [IN, PRE, POST]]|
|[2018] |[73] |[IN] |[[2018], [73], [IN]] |
|[2018, 2017] |[56, 89] |[IN, PRE] |[[2018, 2017], [56, 89], [IN, PRE]] |
+------------------+---------------+---------------+------------------------------------------------------+
scala> val df4 = df3.withColumn("ab", myUdf($"allcols")).select($"ab", posexplode($"ab"))
df4: org.apache.spark.sql.DataFrame = [ab: array<array<string>>, pos: int ... 1 more field]
scala> df4.show(false)
+------------------------------------------------------+---+-----------------+
|ab |pos|col |
+------------------------------------------------------+---+-----------------+
|[[2019, 100, IN], [2018, 200, PRE], [2017, 300, POST]]|0 |[2019, 100, IN] |
|[[2019, 100, IN], [2018, 200, PRE], [2017, 300, POST]]|1 |[2018, 200, PRE] |
|[[2019, 100, IN], [2018, 200, PRE], [2017, 300, POST]]|2 |[2017, 300, POST]|
|[[2018, 73, IN]] |0 |[2018, 73, IN] |
|[[2018, 56, IN], [2017, 89, PRE]] |0 |[2018, 56, IN] |
|[[2018, 56, IN], [2017, 89, PRE]] |1 |[2017, 89, PRE] |
+------------------------------------------------------+---+-----------------+
scala> val selCols = (0 until df.columns.length).map(i => $"col".getItem(i).as(df.columns(i))) :+ ($"pos"+1).as("Sequence")
selCols: scala.collection.immutable.IndexedSeq[org.apache.spark.sql.Column] = Vector(col[0] AS `Date`, col[1] AS `Amount`, col[2] AS `Status`, (pos + 1) AS `Sequence`)
scala> df4.select(selCols:_*).show(false)
+----+------+------+--------+
|Date|Amount|Status|Sequence|
+----+------+------+--------+
|2019|100 |IN |1 |
|2018|200 |PRE |2 |
|2017|300 |POST |3 |
|2018|73 |IN |1 |
|2018|56 |IN |1 |
|2017|89 |PRE |2 |
+----+------+------+--------+