Создайте вложенный JSON из всех строк, имеющих одинаковый идентификатор: DataFrame - PullRequest
0 голосов
/ 13 февраля 2019

У меня есть DataFrame df4 с тремя столбцами

  1. id аннотирующая сущность
  2. data с данными массива JSON
  3. executor_id какстроковое значение

Код для его создания выглядит следующим образом:

val df1 = Seq((1, "n1", "d1")).toDF("id",  "number", "data")

val df2 = df1.withColumn("data", to_json(struct($"number", $"data"))).groupBy("id").agg(collect_list($"data").alias("data")).withColumn("executor_id", lit("e1"))

val df3 = df1.withColumn("data", to_json(struct($"number", $"data"))).groupBy("id").agg(collect_list($"data").alias("data")).withColumn("executor_id", lit("e2"))

val df4 = df2.union(df3)

Содержимое DF4 выглядит как

scala> df4.show(false)
+---+-----------------------------+-----------+
|id |data                         |executor_id|
+---+-----------------------------+-----------+
|1  |[{"number":"n1","data":"d1"}]|e1         |
|1  |[{"number":"n1","data":"d1"}]|e2         |
+---+-----------------------------+-----------+

Мне нужно создать новые данные JSON сexecutor_id в качестве ключа и data в качестве данных json, сгруппировать по id.Результирующие данныеФрейм типа

+---+------------------------------------------------------------------------+
|id |new_data                                                                |
+---+------------------------------------------------------------------------+
|1  |{"e1":[{"number":"n1","data":"d1"}], "e2":[{"number":"n1","data":"d1"}]}|
+---+------------------------------------------------------------------------+

Версии:

Spark: 2.2
Scala: 2.11

1 Ответ

0 голосов
/ 15 февраля 2019

Я пытался решить эту проблему последние три дня и, наконец, смог обойти ее, используя UserDefinedAggregateFunction.Вот пример кода для того же

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

class CustomAggregator extends UserDefinedAggregateFunction {
  override def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(Array(StructField("key", StringType), StructField("value", ArrayType(StringType))))

  // This is the internal fields you keep for computing your aggregate
  override def bufferSchema: StructType = StructType(
    Array(StructField("mapData", MapType(StringType, ArrayType(StringType))))
  )

  // This is the output type of your aggregatation function.
  override def dataType = StringType

  override def deterministic: Boolean = true

  // This is the initial value for your buffer schema.
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = scala.collection.mutable.Map[String, String]()
  }

  // This is how to update your buffer schema given an input.
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getMap(0) + (input.getAs[String](0) -> input.getAs[String](1))
  }

  // This is how to merge two objects with the bufferSchema type.
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1.update(0, buffer1.getAs[Map[String, Any]](0) ++ buffer2.getAs[Map[String, Any]](0))
  }

  // This is where you output the final value, given the final value of your bufferSchema.
  override def evaluate(buffer: Row): Any = {
    val map = buffer(0).asInstanceOf[Map[Any, Any]]
    val buff: ListBuffer[String] = ListBuffer()
    for ((k, v) <- map) {
      val valArray = v.asInstanceOf[mutable.WrappedArray[Any]].array;
      val tmp = {
        for {
          valString <- valArray
        } yield valString.toString
      }.toList.mkString(",")
      buff += "\"" + k.toString + "\":[" + tmp + "]"
    }
    "{" + buff.toList.mkString(",") + "}"
  }
}

Теперь используйте customAggregator,

val ca = new CustomAggregator
val df5 = df4.groupBy("id").agg(ca(col("executor_id"), col("data")).as("jsonData"))

Результирующий DF равен

scala> df5.show(false)
+---+-----------------------------------------------------------------------+
|id |jsonData                                                               |
+---+-----------------------------------------------------------------------+
|1  |{"e1":[{"number":"n1","data":"d1"}],"e2":[{"number":"n1","data":"d1"}]}|
+---+-----------------------------------------------------------------------+

Даже если я решил эту проблему, яне уверен, правильно ли это или нет.Причины моих сомнений:

  1. Местами я использовал Any.Я не думаю, что это правильно.
  2. Для каждой оценки я создаю ListBuffer и многие другие типы данных.Я не уверен насчет производительности кода.
  3. Мне все еще приходится тестировать код для многих типов данных, таких как double, дата tpye, вложенный json и т. Д. В качестве данных.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...