Spark - Как объединить строки из окна со значением шага - PullRequest
0 голосов
/ 11 января 2020

У меня есть следующий фрейм данных (данные временного ряда):

    value category
    a1    c1
    a2    c1
    a3    c1
    a4    c1
    a5    c1
    a6    c1
    a7    c1
    a8    c1
    a1    c2
    a2    c2
    a3    c2
    a4    c2
    a5    c2
    a6    c2
    a7    c2
    a8    c2

Я хочу использовать скользящее окно, то есть с размером окна = 4 и шагом = 2, что означает окно содержит 4 строки, и мы перемещаем окно на 2 строки. ожидаемый результат должен быть таким:

    window value      category
    [a1, a2, a3, a4]    c1
    [a3, a4, a5, a6]    c1
    [a5, a6, a7, a8]    c1
    [a1, a2, a3, a4]    c2
    [a3, a4, a5, a6]    c2
    [a5, a6, a7, a8]    c2

Я пробовал с оконной функцией. Однако, насколько мне известно, окно будет перебирать все строки внутри моего Dataframe. Пример исходного кода:

# define the window spec with 4 rows following        
windowSpec = Window.partitionBy(col("category").orderBy(col("value ")).rowsBetween(0, 3)  
# get the window data
window_data = df.withColumn('window_data',collect_list(col("value")).over(windowSpec))

Таким образом, результат будет выглядеть следующим образом:

    window_data      category
    [a1, a2, a3, a4]    c1
    [a2, a3, a4, a5]    c1
    [a3, a4, a5, a6]    c1
    [a4, a5, a6, a7]    c1
    ...

Обновление: на самом деле, мы могли бы объединить все строки в окне для каждой строки в кадре данных , затем отфильтруйте только несколько строк в определенных c позициях. Но это кажется дорогостоящим, поскольку нам нужно сделать две итерации для всех строк в кадре данных, и нам нужно объединить те, которые мы проигнорируем позже. Интуитивно, я думаю, мы могли бы иметь более оптимизированный вариант.

Не могли бы вы, ребята, порекомендовать какие-либо методы, чтобы получить желаемый результат?

Заранее спасибо: -)

1 Ответ

0 голосов
/ 11 января 2020

Труонг,

Вот код scala (очень похожий на python):

val window = Window.partitionBy(col("category")).orderBy("value")
val window_data = df.withColumn("window_data", collect_list(col("value")).over(window.rowsBetween(0,3)))
    .withColumn("rownum", row_number().over(window))
    .where(pmod($"rownum", lit(2))===1 && size(col("window_data")) === 4)
    .drop("rownum")

window_data.show(false)

Вывод:

+-----+--------+----------------+
|value|category|window_data     |
+-----+--------+----------------+
|a1   |c1      |[a1, a2, a3, a4]|
|a3   |c1      |[a3, a4, a5, a6]|
|a5   |c1      |[a5, a6, a7, a8]|
|a1   |c2      |[a1, a2, a3, a4]|
|a3   |c2      |[a3, a4, a5, a6]|
|a5   |c2      |[a5, a6, a7, a8]|
+-----+--------+----------------+

Идея состоит в том, чтобы вычислить положение каждой строки в том же окне, что и поле window_data, и оставить только нечетные строки. Кроме того, мы будем удалять строки, для которых мы не найдем 4 элемента в window_data (вместо добавления нулей).

Редактировать : Вот план запроса. Он показывает, что для вычисления столбцов rownum и window_data используется одно окно (без дополнительной случайной обработки / сортировки).

== Physical Plan ==
*(2) Project [value#5, category#6, window_data#10]
+- *(2) Filter ((isnotnull(rownum#15) && (pmod(rownum#15, 2) = 1)) && (size(window_data#10) = 4))
   +- Window [collect_list(value#5, 0, 0) windowspecdefinition(category#6, value#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, currentrow$(), 3)) AS window_data#10, row_number() windowspecdefinition(category#6, value#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rownum#15], [category#6], [value#5 ASC NULLS FIRST]
      +- *(1) Sort [category#6 ASC NULLS FIRST, value#5 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(category#6, 200)
            +- LocalTableScan [value#5, category#6]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...