У меня есть DataFrame в две строки и несколько столбцов, как перенести в два столбца и несколько строк? - PullRequest
0 голосов
/ 06 ноября 2019

У меня есть искра DataFrame, подобная этой:

+---+---+---+---+---+---+---+
| f1| f2| f3| f4| f5| f6| f7|
+---+---+---+---+---+---+---+
|  5|  4|  5|  2|  5|  5|  5|
+---+---+---+---+---+---+---+

как вы можете повить к

+---+---+
| f1|  5|
+---+---+
| f2|  4|
+---+---+
| f3|  5|
+---+---+
| f4|  2|
+---+---+
| f5|  5|
+---+---+
| f6|  5|
+---+---+
| f7|  5|
+---+---+

Есть ли простой код в искровой скале, который можно использовать для транспонирования?

Ответы [ 3 ]

0 голосов
/ 06 ноября 2019
spark 2.4+ use map_from_arrays
scala> var df =Seq(( 5,  4,  5,  2,  5,  5,  5)).toDF("f1", "f2", "f3", "f4", "f5", "f6", "f7")

scala> df.select(array('*).as("v"), lit(df.columns).as("k")).select('v.getItem(0).as("cust_id"), map_from_arrays('k,'v).as("map")).select(explode('map)).show(false)
+---+-----+
|key|value|
+---+-----+
|f1 |5    |
|f2 |4    |
|f3 |5    |
|f4 |2    |
|f5 |5    |
|f6 |5    |
|f7 |5    |
+---+-----+

надеюсь, это поможет вам.

0 голосов
/ 07 ноября 2019

Я написал функцию

object DT {
  val KEY_COL_NAME = "dt_key"
  val VALUE_COL_NAME = "dt_value"

  def pivot(df: DataFrame, valueDataType: DataType, cols: Array[String], keyColName: String, valueColName: String): DataFrame = {
    val tempData: RDD[Row] = df.rdd.flatMap(row => row.getValuesMap(cols).map(Row.fromTuple))
    val keyStructField = DataTypes.createStructField(keyColName, DataTypes.StringType, false)
    val valueStructField = DataTypes.createStructField(valueColName, DataTypes.StringType, true)
    val structType = DataTypes.createStructType(Array(keyStructField, valueStructField))
    df.sparkSession.createDataFrame(tempData, structType).select(col(keyColName), col(valueColName).cast(valueDataType))
  }

  def pivot(df: DataFrame, valueDataType: DataType): DataFrame = {
    pivot(df, valueDataType, df.columns, KEY_COL_NAME, VALUE_COL_NAME)
  }
}

она работала

df.show()
DT.pivot(df,DoubleType).show()

вот так

+---+---+-----------+---+---+       +------+-----------+
| f1| f2|         f3| f4| f5|       |dt_key|   dt_value|
+---+---+-----------+---+---+  to   +------+-----------+
|100|  1|0.355072464|  0| 31|       |    f1|      100.0|
+---+---+-----------+---+---+       |    f5|       31.0|
                                    |    f3|0.355072464|
                                    |    f4|        0.0|
                                    |    f2|        1.0|
                                    +------+-----------+

и

+---+---+-----------+-----------+---+        +------+-----------+
| f1| f2|         f3|         f4| f5|        |dt_key|   dt_value|
+---+---+-----------+-----------+---+  to    +------+-----------+
|100|  1|0.355072464|          0| 31|        |    f1|      100.0|
| 63|  2|0.622775801|0.685809375| 16|        |    f5|       31.0|
+---+---+-----------+-----------+---+        |    f3|0.355072464|
                                             |    f4|        0.0|
                                             |    f2|        1.0|
                                             |    f1|       63.0|
                                             |    f5|       16.0|
                                             |    f3|0.622775801|
                                             |    f4|0.685809375|
                                             |    f2|        2.0|
                                             +------+-----------+

очень приятно!

0 голосов
/ 06 ноября 2019
scala> df.show()
+---+---+---+---+---+---+---+
| f1| f2| f3| f4| f5| f6| f7|
+---+---+---+---+---+---+---+
|  5|  4|  5|  2|  5|  5|  5|
+---+---+---+---+---+---+---+


scala> import org.apache.spark.sql.DataFrame

scala> def transposeUDF(transDF: DataFrame, transBy: Seq[String]): DataFrame = {
     |   val (cols, types) = transDF.dtypes.filter{ case (c, _) => !transBy.contains(c)}.unzip
     |   require(types.distinct.size == 1)      
     | 
     |   val kvs = explode(array(
     |     cols.map(c => struct(lit(c).alias("columns"), col(c).alias("value"))): _*
     |   ))
     | 
     |   val byExprs = transBy.map(col(_))
     | 
     |   transDF
     |     .select(byExprs :+ kvs.alias("_kvs"): _*)
     |     .select(byExprs ++ Seq($"_kvs.columns", $"_kvs.value"): _*)
     | }

scala> val df1 = df.withColumn("tempColumn", lit("1"))

scala> transposeUDF(df1, Seq("tempColumn")).drop("tempColumn").show(false)
+-------+-----+
|columns|value|
+-------+-----+
|f1     |5    |
|f2     |4    |
|f3     |5    |
|f4     |2    |
|f5     |5    |
|f6     |5    |
|f7     |5    |
+-------+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...