совокупность искровых событий на ключ, включая их временные метки изменения - PullRequest
0 голосов
/ 25 марта 2020

Для кадра данных:

+----+--------+-------------------+----+
|user|      dt|         time_value|item|
+----+--------+-------------------+----+
| id1|20200101|2020-01-01 00:00:00|   A|
| id1|20200101|2020-01-01 10:00:00|   B|
| id1|20200101|2020-01-01 09:00:00|   A|
| id1|20200101|2020-01-01 11:00:00|   B|
+----+--------+-------------------+----+

Я хочу захватить все уникальные элементы, например collect_set, но сохранить свои собственные time_value

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.unix_timestamp
import org.apache.spark.sql.functions.collect_set
import org.apache.spark.sql.types.TimestampType
val timeFormat = "yyyy-MM-dd HH:mm"
val dx = Seq(("id1", "20200101", "2020-01-01 00:00", "A"), ("id1", "20200101","2020-01-01 10:00", "B"), ("id1", "20200101","2020-01-01 9:00", "A"), ("id1", "20200101","2020-01-01 11:00", "B")).toDF("user", "dt","time_value", "item").withColumn("time_value", unix_timestamp(col("time_value"), timeFormat).cast(TimestampType))
dx.show

A

dx.groupBy("user", "dt").agg(collect_set("item")).show
+----+--------+-----------------+                                               
|user|      dt|collect_set(item)|
+----+--------+-----------------+
| id1|20200101|           [B, A]|
+----+--------+-----------------+

не сохраняет информацию time_value, когда сигнал переключается с A на B. Как я могу сохранить информацию о значении времени для каждого набора в элементе?

Возможно ли иметь коллекцию в пределах оконной функции для достижения желаемого результата? В настоящее время я могу думать только о том, чтобы:

  1. использовать оконную функцию для определения пар событий
  2. фильтр для изменения событий
  3. агрегат

, который необходимо перемешать несколько раз. Альтернативно, UDF был бы возможен (collect_list(sort_array(struct(time_value, item)))), но это также кажется довольно неуклюжим.

Есть ли лучший способ?

1 Ответ

2 голосов
/ 25 марта 2020

Я бы действительно использовал оконные функции, чтобы изолировать точки изменения, я думаю, что нет альтернатив:

val win = Window.partitionBy($"user",$"dt").orderBy($"time_value")

dx
.orderBy($"time_value")
.withColumn("item_change_post",coalesce((lag($"item",1).over(win)=!=$"item"),lit(false)))
.withColumn("item_change_pre",lead($"item_change_post",1).over(win))
.where($"item_change_pre" or $"item_change_post")
.show()

+----+--------+-------------------+----+----------------+---------------+
|user|      dt|         time_value|item|item_change_post|item_change_pre|
+----+--------+-------------------+----+----------------+---------------+
| id1|20200101|2020-01-01 09:00:00|   A|           false|           true|
| id1|20200101|2020-01-01 10:00:00|   B|            true|          false|
+----+--------+-------------------+----+----------------+---------------+

, тогда используйте что-то вроде groupBy($"user",$"dt").agg(collect_list(struct($"time_value",$"item")))

Я не Подумайте, что происходит несколько перемешиваний, потому что вы всегда разбиваете / группируете по одним и тем же ключам.

Вы можете попытаться сделать его более эффективным, агрегируя ваш начальный кадр данных в мин / макс time_value для каждого item, затем сделайте то же самое, что и выше.

...