Как разделить запятые нескольких столбцов на несколько строк? - PullRequest
3 голосов
/ 28 июня 2019

У меня есть фрейм данных с N полями, как указано ниже.Количество столбцов и длина значения будут различаться.

Входная таблица:

+--------------+-----------+-----------------------+
|Date          |Amount     |Status                 |
+--------------+-----------+-----------------------+
|2019,2018,2017|100,200,300|IN,PRE,POST            |
|2018          |73         |IN                     |
|2018,2017     |56,89      |IN,PRE                 |
+--------------+-----------+-----------------------+

Я должен преобразовать его в приведенный ниже формат с одним столбцом последовательности.

Таблица ожидаемых выходных данных:

+-------------+------+---------+
|Date  |Amount|Status| Sequence|
+------+------+------+---------+
|2019  |100   |IN    |   1     |
|2018  |200   |PRE   |   2     |
|2017  |300   |POST  |   3     |
|2018  |73    |IN    |   1     |
|2018  |56    |IN    |   1     |
|2017  |89    |PRE   |   2     |
+-------------+------+---------+

Я пробовал использовать разнесение, но разнесение занимает только один массив за раз.

var df = dataRefined.withColumn("TOT_OVRDUE_TYPE", explode(split($"TOT_OVRDUE_TYPE", "\\"))).toDF

var df1 = df.withColumn("TOT_OD_TYPE_AMT", explode(split($"TOT_OD_TYPE_AMT", "\\"))).show 

Кто-то знает, как я могу это сделать?Спасибо за вашу помощь.

Ответы [ 6 ]

1 голос
/ 28 июня 2019

Этого можно добиться, используя Встроенные функции Dataframe arrays_zip, split, posexplode

Explanation:

scala>val df=Seq((("2019,2018,2017"),("100,200,300"),("IN,PRE,POST")),(("2018"),("73"),("IN")),(("2018,2017"),("56,89"),("IN,PRE"))).toDF("date","amount","status")

scala>:paste
df.selectExpr("""posexplode(
                            arrays_zip(
                                        split(date,","), //split date string with ',' to create array
                                        split(amount,","),
                                        split(status,","))) //zip arrays
                            as (p,colum) //pos explode on zip arrays will give position and column value
            """)
    .selectExpr("colum.`0` as Date", //get 0 column as date
                "colum.`1` as Amount", 
                "colum.`2` as Status", 
                "p+1 as Sequence") //add 1 to the position value
    .show()

Result:

+----+------+------+--------+
|Date|Amount|Status|Sequence|
+----+------+------+--------+
|2019|   100|    IN|       1|
|2018|   200|   PRE|       2|
|2017|   300|  POST|       3|
|2018|    73|    IN|       1|
|2018|    56|    IN|       1|
|2017|    89|   PRE|       2|
+----+------+------+--------+
1 голос
/ 28 июня 2019

Вот еще один подход, использующий posexplode для каждого столбца и объединяющий все созданные кадры данных в один:

import org.apache.spark.sql.functions. {Posexplode, monotonically_increasing_id, col}

val df = Seq(
  (Seq("2019", "2018", "2017"), Seq("100", "200", "300"), Seq("IN", "PRE", "POST")),
  (Seq("2018"), Seq("73"), Seq("IN")),
  (Seq("2018", "2017"), Seq("56", "89"), Seq("IN", "PRE")))
.toDF("Date","Amount", "Status")
.withColumn("idx", monotonically_increasing_id)

df.columns.filter(_ != "idx").map{
  c => df.select($"idx", posexplode(col(c))).withColumnRenamed("col", c)
}
.reduce((ds1, ds2) => ds1.join(ds2, Seq("idx", "pos")))
.select($"Date", $"Amount", $"Status", $"pos".plus(1).as("Sequence"))
.show

Выход:

+----+------+------+--------+
|Date|Amount|Status|Sequence|
+----+------+------+--------+
|2019|   100|    IN|       1|
|2018|   200|   PRE|       2|
|2017|   300|  POST|       3|
|2018|    73|    IN|       1|
|2018|    56|    IN|       1|
|2017|    89|   PRE|       2|
+----+------+------+--------+
1 голос
/ 28 июня 2019

Да, лично я нахожу explode немного раздражающим, и в вашем случае я бы, вероятно, выбрал flatMap вместо:

import spark.implicits._
import org.apache.spark.sql.Row
val df = spark.sparkContext.parallelize(Seq((Seq(2019,2018,2017), Seq(100,200,300), Seq("IN","PRE","POST")),(Seq(2018), Seq(73), Seq("IN")),(Seq(2018,2017), Seq(56,89), Seq("IN","PRE")))).toDF()

val transformedDF = df
  .flatMap{case Row(dates: Seq[Int], amounts: Seq[Int], statuses: Seq[String]) =>
     dates.indices.map(index => (dates(index), amounts(index), statuses(index), index+1))}
  .toDF("Date", "Amount", "Status", "Sequence")

Вывод:

df.show
+----+------+------+--------+
|Date|Amount|Status|Sequence|
+----+------+------+--------+
|2019|   100|    IN|       1|
|2018|   200|   PRE|       2|
|2017|   300|  POST|       3|
|2018|    73|    IN|       1|
|2018|    56|    IN|       1|
|2017|    89|   PRE|       2|
+----+------+------+--------+
0 голосов
/ 28 июня 2019

Вот почему я люблю API-интерфейсы Spark-Core. С помощью map и flatMap вы можете справиться со многими проблемами. Просто передайте ваш df и экземпляр SQLContext нижеуказанному методу, и он даст желаемый результат -

def reShapeDf(df:DataFrame, sqlContext: SQLContext): DataFrame ={

    val rdd = df.rdd.map(m => (m.getAs[String](0),m.getAs[String](1),m.getAs[String](2)))

    val rdd1 = rdd.flatMap(a => a._1.split(",").zip(a._2.split(",")).zip(a._3.split(",")))
    val rdd2 = rdd1.map{
      case ((a,b),c) => (a,b,c)
    }

    sqlContext.createDataFrame(rdd2.map(m => Row.fromTuple(m)),df.schema)
}
0 голосов
/ 28 июня 2019

Я воспользовался транспонированием, чтобы сжать все последовательности по позициям, а затем сделал позэксплод.Выборки на фреймах данных являются динамическими для удовлетворения условия: Количество столбцов и длина значения будут варьироваться в вопросе.

import org.apache.spark.sql.functions._


val df = Seq(
  ("2019,2018,2017", "100,200,300", "IN,PRE,POST"),
  ("2018", "73", "IN"),
  ("2018,2017", "56,89", "IN,PRE")
).toDF("Date", "Amount", "Status")
df: org.apache.spark.sql.DataFrame = [Date: string, Amount: string ... 1 more field]

scala> df.show(false)
+--------------+-----------+-----------+
|Date          |Amount     |Status     |
+--------------+-----------+-----------+
|2019,2018,2017|100,200,300|IN,PRE,POST|
|2018          |73         |IN         |
|2018,2017     |56,89      |IN,PRE     |
+--------------+-----------+-----------+


scala> def transposeSeqOfSeq[S](x:Seq[Seq[S]]): Seq[Seq[S]] = { x.transpose }
transposeSeqOfSeq: [S](x: Seq[Seq[S]])Seq[Seq[S]]

scala> val myUdf = udf { transposeSeqOfSeq[String] _}
myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(ArrayType(StringType,true),true),Some(List(ArrayType(ArrayType(StringType,true),true))))

scala> val df2 = df.select(df.columns.map(c => split(col(c), ",") as c): _*)
df2: org.apache.spark.sql.DataFrame = [Date: array<string>, Amount: array<string> ... 1 more field]

scala> df2.show(false)
+------------------+---------------+---------------+
|Date              |Amount         |Status         |
+------------------+---------------+---------------+
|[2019, 2018, 2017]|[100, 200, 300]|[IN, PRE, POST]|
|[2018]            |[73]           |[IN]           |
|[2018, 2017]      |[56, 89]       |[IN, PRE]      |
+------------------+---------------+---------------+


scala> val df3 = df2.withColumn("allcols", array(df.columns.map(c => col(c)): _*))
df3: org.apache.spark.sql.DataFrame = [Date: array<string>, Amount: array<string> ... 2 more fields]

scala> df3.show(false)
+------------------+---------------+---------------+------------------------------------------------------+
|Date              |Amount         |Status         |allcols                                               |
+------------------+---------------+---------------+------------------------------------------------------+
|[2019, 2018, 2017]|[100, 200, 300]|[IN, PRE, POST]|[[2019, 2018, 2017], [100, 200, 300], [IN, PRE, POST]]|
|[2018]            |[73]           |[IN]           |[[2018], [73], [IN]]                                  |
|[2018, 2017]      |[56, 89]       |[IN, PRE]      |[[2018, 2017], [56, 89], [IN, PRE]]                   |
+------------------+---------------+---------------+------------------------------------------------------+


scala> val df4 = df3.withColumn("ab", myUdf($"allcols")).select($"ab", posexplode($"ab"))
df4: org.apache.spark.sql.DataFrame = [ab: array<array<string>>, pos: int ... 1 more field]

scala> df4.show(false)
+------------------------------------------------------+---+-----------------+
|ab                                                    |pos|col              |
+------------------------------------------------------+---+-----------------+
|[[2019, 100, IN], [2018, 200, PRE], [2017, 300, POST]]|0  |[2019, 100, IN]  |
|[[2019, 100, IN], [2018, 200, PRE], [2017, 300, POST]]|1  |[2018, 200, PRE] |
|[[2019, 100, IN], [2018, 200, PRE], [2017, 300, POST]]|2  |[2017, 300, POST]|
|[[2018, 73, IN]]                                      |0  |[2018, 73, IN]   |
|[[2018, 56, IN], [2017, 89, PRE]]                     |0  |[2018, 56, IN]   |
|[[2018, 56, IN], [2017, 89, PRE]]                     |1  |[2017, 89, PRE]  |
+------------------------------------------------------+---+-----------------+

scala> val selCols = (0 until df.columns.length).map(i => $"col".getItem(i).as(df.columns(i))) :+ ($"pos"+1).as("Sequence")
selCols: scala.collection.immutable.IndexedSeq[org.apache.spark.sql.Column] = Vector(col[0] AS `Date`, col[1] AS `Amount`, col[2] AS `Status`, (pos + 1) AS `Sequence`)

scala> df4.select(selCols:_*).show(false)
+----+------+------+--------+
|Date|Amount|Status|Sequence|
+----+------+------+--------+
|2019|100   |IN    |1       |
|2018|200   |PRE   |2       |
|2017|300   |POST  |3       |
|2018|73    |IN    |1       |
|2018|56    |IN    |1       |
|2017|89    |PRE   |2       |
+----+------+------+--------+
0 голосов
/ 28 июня 2019

Предполагая, что количество элементов данных в каждом столбце одинаково для каждой строки:

Сначала я пересоздал ваш DataFrame

import org.apache.spark.sql._
import scala.collection.mutable.ListBuffer

val df = Seq(("2019,2018,2017", "100,200,300", "IN,PRE,POST"), ("2018", "73", "IN"),
  ("2018,2017", "56,89", "IN,PRE")).toDF("Date", "Amount", "Status")

Затем я разделил строки и добавил значение последовательности, а затем преобразовал обратно в DF:

val exploded = df.rdd.flatMap(row => {
  val buffer = new ListBuffer[(String, String, String, Int)]
  val dateSplit = row(0).toString.split("\\,", -1)
  val amountSplit = row(1).toString.split("\\,", -1)
  val statusSplit = row(2).toString.split("\\,", -1)
  val seqSize = dateSplit.size
  for(i <- 0 to seqSize-1)
    buffer += Tuple4(dateSplit(i), amountSplit(i), statusSplit(i), i+1)
  buffer.toList
}).toDF((df.columns:+"Sequence"): _*)

Я уверен, что есть другие способы сделать это без предварительного преобразования DF в RDD, но это все равно приведет к DF с правильным ответом.

Дайте мне знать, если у вас есть какие-либо вопросы.

...