У меня есть фрейм данных df
со схемой ниже (Spark 2.4)
root
|-- segId: string (nullable = true)
|-- time: timestamp (nullable = true)
|-- val1: double (nullable = true)
, где
segId
- это сегмент (считайте его уникальным идентификатор) time
- это отметка времени, когда было выполнено какое-то измерение val1
- значение измерения
Мне нужно вычислить среднее (или некоторая настраиваемая агрегация) val1
по нескольким rangeBetween
s. Например, я хочу вычислить среднее значение за последнюю 1 минуту, 2 минуты, ..., 100 минут для каждого сегмента.
Я не хочу создавать 100 windows (где оно разбито и отсортировано 100 раз). Я хочу создать одно физическое окно (разделить по segId
и упорядочить по time
один раз), а затем использовать rangeBetween
в течение последних n минут (логическое смещение по сравнению с ранее разбитым набором).
Код Например, для вычисления последних 1, 2 и 3 минут:
win_physical = Window.partitionBy("segId").orderBy(F.col("time").cast("long"))
df = (
df.repartition("segId")
.orderBy(F.col("time").cast("long"))
.withColumn("mean1Mins", F.mean("val1").over(win_physical.rangeBetween( -(60-1), 0)))
.withColumn(
"mean2Mins", F.mean("val2").over(win_physical.rangeBetween(-(2*60-1), 0))
)
.withColumn(
"mean3Mins", F.mean("val1").over(win_physical.rangeBetween(-(3*60-1), 0))
)
.show()
)
Физический план показывает использование трех windows для приведенного выше примера
== Physical Plan ==
CollectLimit 21
+- *(6) Project [segId#0, cast(time#8 as string) AS time#102, cast(val1#2 as string) AS val1#97, cast(val2#3L as string) AS val2#98, cast(mean1Mins#63 as string) AS mean1Mins#99, cast(mean2Mins#71 as string) AS mean2Mins#100, cast(mean3Mins#80 as string) AS mean3Mins#101]
+- Window [avg(val1#2) windowspecdefinition(segId#0, _w0#81L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -179, currentrow$())) AS mean3Mins#80], [segId#0], [_w0#81L ASC NULLS FIRST]
+- *(5) Sort [segId#0 ASC NULLS FIRST, _w0#81L ASC NULLS FIRST], false, 0
+- *(5) Project [segId#0, time#8, val1#2, val2#3L, mean1Mins#63, mean2Mins#71, cast(time#8 as bigint) AS _w0#81L]
+- Window [avg(val2#3L) windowspecdefinition(segId#0, _w0#72L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -119, currentrow$())) AS mean2Mins#71], [segId#0], [_w0#72L ASC NULLS FIRST]
+- *(4) Sort [segId#0 ASC NULLS FIRST, _w0#72L ASC NULLS FIRST], false, 0
+- *(4) Project [segId#0, time#8, val1#2, val2#3L, mean1Mins#63, cast(time#8 as bigint) AS _w0#72L]
+- Window [avg(val1#2) windowspecdefinition(segId#0, _w0#64L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -59, currentrow$())) AS mean1Mins#63], [segId#0], [_w0#64L ASC NULLS FIRST]
+- *(3) Sort [segId#0 ASC NULLS FIRST, _w0#64L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(segId#0, 1000)
+- *(2) Project [segId#0, time#8, val1#2, val2#3L, cast(time#8 as bigint) AS _w0#64L]
+- *(2) Sort [cast(time#8 as bigint) ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(cast(time#8 as bigint) ASC NULLS FIRST, 1000)
+- Exchange hashpartitioning(segId#0, 1000)
+- *(1) Project [segId#0, cast(time#1 as timestamp) AS time#8, val1#2, val2#3L]
+- Scan ExistingRDD[segId#0,time#1,val1#2,val2#3L]
Мой вопрос:
- Будет ли искроваться повторное использование одного и того же физического раздела (т. Е. Одного разделенного и упорядоченного один раз) для нескольких логических windows? Или он создаст отдельные разделы и сортировку для каждого
rangeBetween
(с интенсивными вычислениями)? - Есть ли предложения по вычислительному улучшению вышеуказанного logi c для настраиваемых агрегатов по разным
rangeBetween
на одном разделе?