Scala - Как преобразовать Spark DataFrame в карту - PullRequest
0 голосов
/ 08 апреля 2020

Как преобразовать Spark DataFrame в Map, как показано ниже: Я хочу преобразовать в Map, а затем Json. Pivot не работал, чтобы изменить форму cplumn, поэтому любая помощь будет полезна для преобразования в виде карты, как показано ниже.

Входной DataFrame:

        +-----+-----+-------+--------------------+
        |col1 |col2 |object | values             |
        +-------------------+--------------------+
        |one  | two | main  |[101 -> A, 202 -> B]|
        +-------------------+--------------------+

        Expected Output DataFrame :
        +-----+-----+-------+--------------------+------------------------------------------------------------------------+
        |col1 |col2 |object | values             | newMap                                                                 |
        +-----+-----+-------+--------------------+------------------------------------------------------------------------+
        |one  | two |main   |[101 -> A, 202 -> B]|[col1 -> one, col2 -> two, object -> main, main -> [101 -> A, 202 -> B]]|
        +-----+-----+-------+--------------------+------------------------------------------------------------------------+

, как показано ниже, но безуспешно:

    val toMap = udf((col1: String, col2: String, object: String, values: Map[String, String])) => {
    col1.zip(values).toMap // need help for logic
    // col1 -> col1_value, col2 -> col2_values, object -> object_value, object_value -> [values_of_Col_Values].toMap
  })

    df.withColumn("newMap", toMap($"col1", $"col2", $"object", $"values"))
    I am stuck to format the code properly and get the output, please help either in Scala or Spark.

1 Ответ

1 голос
/ 08 апреля 2020

Это прямо сейчас. Очевидно, что предварительное условие равно , вы должны иметь все столбцы с одинаковым типом , иначе вы получите ошибку искры.

import spark.implicits._
import org.apache.spark.sql.functions._
val df = Seq(("Foo", "L", "10"), ("Boo", "XL", "20"))
        .toDF("brand", "size", "sales")

//Prepare your map columns.Bit of nasty iteration work is required
  var preCol: Column = null
  var counter = 1
  val size = df.schema.fields.length
  val mapColumns = df.schema.flatMap { field =>

  val res = if (counter == size)
          Seq(preCol, col(field.name))
        else
          Seq(lit(field.name), col(field.name))

 //assign the current field name for tracking and increment the counter by 1
   preCol = col(field.name)
   counter += 1

   res
  }

df.withColumn("new", map(mapColumns: _*)).show(false)

Результат

+-----+----+-----+---------------------------------------+
|brand|size|sales|new                                    |
+-----+----+-----+---------------------------------------+
|Foo  |L   |10   |Map(brand -> Foo, size -> L, L -> 10)  |
|Boo  |XL  |20   |Map(brand -> Boo, size -> XL, XL -> 20)|
+-----+----+-----+---------------------------------------+
...