Как сгруппировать по элементу структуры и преобразовать его обратно в структуру с той же схемой - PullRequest
1 голос
/ 31 марта 2020

Spark 2.4.5 В моем фрейме данных у меня есть массив struct, и этот массив время от времени содержит снимок поля.

Теперь я ищу способ иметь только снимки при изменении данных.

Моя схема выглядит следующим образом

root 
 |-- fee: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- updated_at: long (nullable = true)
 |    |    |-- fee: float (nullable = true)
 |-- status: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- updated_at: long (nullable = true)
 |    |    |-- status: string (nullable = true)

Существующий вывод:

+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+
|fee                                                                     |status                                                                                       |
+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+
|[[1584579671000, 12.11], [1584579672000, 12.11], [1584579673000, 12.11]]|[[1584579671000, Closed-A], [1584579672000, Closed-A], [1584579673000, Closed-B], [1584579674000, Closed], [1584579675000, Closed-A]]|
+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+

Поскольку столбец 'fee' не изменился, поэтому он должен иметь только одну запись, так как статус изменился несколько раз, поэтому o / p будет [[1584579671000, Closed-A], [1584579673000, Closed-B], [1584579674000, Closed], [1584579675000, Closed-A] ] Обратите внимание, что здесь дважды отображается состояние «Закрыто-A».

Попытка получить следующий вывод:

+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+
|fee                     |status                                                                                        |
+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+
|[[1584579671000, 12.11]]|[[1584579671000, Closed-A], [1584579673000, Closed-B], [1584579674000, Closed], [1584579675000, Closed-A]]|
+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+

Примечание: Попытка не иметь пользовательскую функцию.

1 Ответ

1 голос
/ 01 апреля 2020

Используя API Spark Dataframe, вышеуказанная проблема может быть решена как; Добавьте монотонно увеличивающийся идентификатор для уникальной идентификации каждой записи, разбейте и сгладьте кадр данных, сгруппируйте по плате и статусу отдельно (согласно требованиям), агрегируйте сгруппированный массив данных по идентификатору, чтобы собрать структуру, объедините оба кадра данных с помощью идентификатора, идентификатор может быть удален в окончательный datafarme.

import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.functions.collect_list
import org.apache.spark.sql.functions.struct

val idDF = df.withColumn("id", monotonically_increasing_id)

val explodeDf = idDF
  .select(col("id"), col("status"), explode(col("fee")).as("fee"))
  .select(col("id"), col("fee"), explode(col("status")).as("status"))

val flatDF = explodeDf.select(col("id"), col("fee.fee"), col("fee.updated_at").as("updated_at_fee"), col("status.status"), col("status.updated_at").as("updated_at_status"))

val feeDF = flatDF.groupBy("id", "fee").min("updated_at_fee")
val feeSelectDF = feeDF.select(col("id"), col("fee"), col("min(updated_at_fee)").as("updated_at"))
val feeAggDF = feeSelectDF.groupBy("id").agg(collect_list(struct("fee", "updated_at")).as("fee"))


val statusDF = flatDF.groupBy("id", "status").min("updated_at_status")
val statusSelectDF = statusDF.select(col("id"), col("status"), col("min(updated_at_status)").as("updated_at"))
val statusAggDF = statusSelectDF.groupBy("id").agg(collect_list(struct("status", "updated_at")).as("status"))

val finalDF = feeAggDF.join(statusAggDF, "id")
finalDF.show(10)
finalDF.printSchema()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...