Как преобразовать вложенную карту Spark Scala в структуру данных карты? - PullRequest
1 голос
/ 23 апреля 2020

Я хочу написать вложенную структуру данных, состоящую из карты, внутри другой карты, используя массив класса Scala.

Результат должен преобразовать этот фрейм данных:

|Value|Country| Timestamp| Sum|
+-----+-------+----------+----+
|  123|    ITA|1475600500|18.0|
|  123|    ITA|1475600516|19.0|
+-----+-------+----------+----+

в:

+--------------------------------------------------------------------+
|value                                                               |
+--------------------------------------------------------------------+
[{"value":123,"attributes":{"ITA":{"1475600500":18,"1475600516":19}}}]
+--------------------------------------------------------------------+

Приведенный ниже набор данных actualResult приближает меня, но структура не совсем совпадает с моим ожидаемым фреймом данных.

case class Record(value: Integer, attributes: Map[String, Map[String, BigDecimal]])
val actualResult = df
  .map(r =>
    Array(
      Record(
        r.getAs[Int]("Value"),
        Map(
          r.getAs[String]("Country") ->
            Map(
              r.getAs[String]("Timestamp") -> new BigDecimal(
                r.getAs[Double]("Sum").toString
              )
            )
        )
      )
    )
  )

Столбец Timestamp в наборе данных actualResult не объединяется в одну строку Record, а вместо этого создает две отдельные строки.

+----------------------------------------------------+
|value                                               |
+----------------------------------------------------+
[{"value":123,"attributes":{"ITA":{"1475600516":19}}}]
[{"value":123,"attributes":{"ITA":{"1475600500":18}}}]
+----------------------------------------------------+

1 Ответ

0 голосов
/ 23 апреля 2020

При использовании groupBy и collect_list в комбинированном столбце creatng с использованием struct I удалось получить одну строку, как показано ниже.

val mycsv =
    """
      |Value|Country|Timestamp|Sum
      |  123|ITA|1475600500|18.0
      |  123|ITA|1475600516|19.0
    """.stripMargin('|').lines.toList.toDS()


  val df: DataFrame = spark.read.option("header", true)
    .option("sep", "|")
    .option("inferSchema", true)
    .csv(mycsv)
  df.show

  val df1 = df.
    groupBy("Value","Country")
    .agg(  collect_list(struct(col("Country"), col("Timestamp"), col("Sum"))).alias("attributes")).drop("Country")


  val json = df1.toJSON // you can save in to file
  json.show(false)

Результат, объединенный в 2 строки

+-----+-------+----------+----+
|Value|Country| Timestamp| Sum|
+-----+-------+----------+----+
|123.0|ITA    |1475600500|18.0|
|123.0|ITA    |1475600516|19.0|
+-----+-------+----------+----+

+----------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                         |
+----------------------------------------------------------------------------------------------------------------------------------------------+
|{"Value":123.0,"attributes":[{"Country":"ITA","Timestamp":1475600500,"Sum":18.0},{"Country":"ITA","Timestamp":1475600516,"Sum":19.0}]}|
+----------------------------------------------------------------------------------------------------------------------------------------------+

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...