Как повторно использовать одно и то же разбиение на несколько логических смещений windows в pyspark? - PullRequest
1 голос
/ 14 июля 2020

У меня есть фрейм данных df со схемой ниже (Spark 2.4)

root
 |-- segId: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- val1: double (nullable = true)

, где

  • segId - это сегмент (считайте его уникальным идентификатор)
  • time - это отметка времени, когда было выполнено какое-то измерение
  • val1 - значение измерения

Мне нужно вычислить среднее (или некоторая настраиваемая агрегация) val1 по нескольким rangeBetween s. Например, я хочу вычислить среднее значение за последнюю 1 минуту, 2 минуты, ..., 100 минут для каждого сегмента.

Я не хочу создавать 100 windows (где оно разбито и отсортировано 100 раз). Я хочу создать одно физическое окно (разделить по segId и упорядочить по time один раз), а затем использовать rangeBetween в течение последних n минут (логическое смещение по сравнению с ранее разбитым набором).

Код Например, для вычисления последних 1, 2 и 3 минут:

win_physical = Window.partitionBy("segId").orderBy(F.col("time").cast("long"))

df = (
    df.repartition("segId")
    .orderBy(F.col("time").cast("long"))
    .withColumn("mean1Mins", F.mean("val1").over(win_physical.rangeBetween( -(60-1), 0)))
    .withColumn(
        "mean2Mins", F.mean("val2").over(win_physical.rangeBetween(-(2*60-1), 0))
    )
    .withColumn(
        "mean3Mins", F.mean("val1").over(win_physical.rangeBetween(-(3*60-1), 0))
    )
    .show()
)

Физический план показывает использование трех windows для приведенного выше примера

== Physical Plan ==
CollectLimit 21
+- *(6) Project [segId#0, cast(time#8 as string) AS time#102, cast(val1#2 as string) AS val1#97, cast(val2#3L as string) AS val2#98, cast(mean1Mins#63 as string) AS mean1Mins#99, cast(mean2Mins#71 as string) AS mean2Mins#100, cast(mean3Mins#80 as string) AS mean3Mins#101]
   +- Window [avg(val1#2) windowspecdefinition(segId#0, _w0#81L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -179, currentrow$())) AS mean3Mins#80], [segId#0], [_w0#81L ASC NULLS FIRST]
      +- *(5) Sort [segId#0 ASC NULLS FIRST, _w0#81L ASC NULLS FIRST], false, 0
         +- *(5) Project [segId#0, time#8, val1#2, val2#3L, mean1Mins#63, mean2Mins#71, cast(time#8 as bigint) AS _w0#81L]
            +- Window [avg(val2#3L) windowspecdefinition(segId#0, _w0#72L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -119, currentrow$())) AS mean2Mins#71], [segId#0], [_w0#72L ASC NULLS FIRST]
               +- *(4) Sort [segId#0 ASC NULLS FIRST, _w0#72L ASC NULLS FIRST], false, 0
                  +- *(4) Project [segId#0, time#8, val1#2, val2#3L, mean1Mins#63, cast(time#8 as bigint) AS _w0#72L]
                     +- Window [avg(val1#2) windowspecdefinition(segId#0, _w0#64L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -59, currentrow$())) AS mean1Mins#63], [segId#0], [_w0#64L ASC NULLS FIRST]
                        +- *(3) Sort [segId#0 ASC NULLS FIRST, _w0#64L ASC NULLS FIRST], false, 0
                           +- Exchange hashpartitioning(segId#0, 1000)
                              +- *(2) Project [segId#0, time#8, val1#2, val2#3L, cast(time#8 as bigint) AS _w0#64L]
                                 +- *(2) Sort [cast(time#8 as bigint) ASC NULLS FIRST], true, 0
                                    +- Exchange rangepartitioning(cast(time#8 as bigint) ASC NULLS FIRST, 1000)
                                       +- Exchange hashpartitioning(segId#0, 1000)
                                          +- *(1) Project [segId#0, cast(time#1 as timestamp) AS time#8, val1#2, val2#3L]
                                             +- Scan ExistingRDD[segId#0,time#1,val1#2,val2#3L]

Мой вопрос:

  1. Будет ли искроваться повторное использование одного и того же физического раздела (т. Е. Одного разделенного и упорядоченного один раз) для нескольких логических windows? Или он создаст отдельные разделы и сортировку для каждого rangeBetween (с интенсивными вычислениями)?
  2. Есть ли предложения по вычислительному улучшению вышеуказанного logi c для настраиваемых агрегатов по разным rangeBetween на одном разделе?

1 Ответ

0 голосов
/ 16 июля 2020

Будет ли искра повторно использовать один и тот же физический раздел (т. Е. Один разделенный и упорядоченный один раз) для нескольких логических windows? Или он создаст отдельные разделы и сортировку для каждого rangeBetween (с большим объемом вычислений)?

Думаю, да. Только операторы Exchange могут перераспределять данные (которые близки к источнику данных Scan ExistingRDD).

Любые предложения по вычислительному улучшению вышеуказанного logi c для настраиваемых агрегатов в разных диапазонах Между одним и тем же разделом ?

Понятия не имею. Извините.

...