Фильтрация строк в фрейме данных PySpark с использованием нескольких окон - PullRequest
0 голосов
/ 02 сентября 2018

Я пытаюсь отфильтровать фрейм данных Pyspark на основе списка кортежей временных отметок [(start1, stop1), (start2, stop2), ...]. Каждый кортеж представляет временное окно. Фрейм данных Pyspark в виде следующей структуры:

+-------------------+------+
|                 ts|   var|
+-------------------+------+
|2018-09-01 20:10:00|     0|
|2018-09-01 20:12:00|     2|
|2018-09-01 20:13:00|     1|
|2018-09-01 20:17:00|     5|
+-------------------+------+

ts - это столбец меток времени, а var - это столбец интересующей переменной. Я ищу эффективный метод для фильтрации всех строк, которые не находятся в одном из временных окон. Например, если мой список временных окон состоит из одного окна [(datetime(2018, 9, 1, 20, 11), datetime(2018, 9, 1, 20, 14))], отфильтрованный кадр данных должен быть

+-------------------+------+ 
|                 ts|   var| 
+-------------------+------+ 
|2018-09-01 20:12:00|     2| 
|2018-09-01 20:13:00|     1|
+-------------------+------+ 

Мне удалось придумать рабочий фрагмент кода, используя udf и цикл for, который повторяется для каждой строки во всех временных окнах (см. Код ниже). Однако зацикливание каждой строки во всех временных окнах происходит медленно.

Некоторая дополнительная информация:

  • размеры и количество временных окон заранее неизвестны, т. Е. Жесткое кодирование невозможно
  • фрейм данных Pyspark обычно состоит из нескольких миллионов строк
  • количество временных окон обычно составляет 100-1000

Если бы кто-то мог указать на более эффективное решение, я был бы очень признателен.

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
import pandas as pd
from datetime import datetime

spark = SparkSession.builder.getOrCreate()

# create Pyspark dataframe
data = {'ts': [datetime(2018, 9, 1, 20, 10), datetime(2018, 9, 1, 20, 12),
               datetime(2018, 9, 1, 20, 13), datetime(2018, 9, 1, 20, 17)],
         'var': [0, 2, 1, 5]}
df = spark.createDataFrame(pd.DataFrame(data))

# list of windows [(start1, stop1), (start2, stop2), ...] for filtering
windows = [(datetime(2018, 9, 1, 20, 11), datetime(2018, 9, 1, 20, 14))]

# udf for filtering
def is_in_windows_udf(windows):
    def _is_in_windows(t, windows):
        for ts_l, ts_h in windows:
            if ts_l <= t <= ts_h:
                return True
            return False
    return udf(lambda t: _is_in_windows(t, windows), BooleanType())

# perform actual filtering operation
df.where(is_in_windows_udf(windows)(col("ts"))).show()

1 Ответ

0 голосов
/ 02 сентября 2018

Более простое решение может быть приведено ниже, и поскольку мы выполняем объединение одного и того же набора данных, чтобы оно также распараллеливало выполнение:

for count, item in enumerate(windows):
    if count == 0:
        result = df.filter(
            (F.col("ts")<= item[1]) &
            (F.col("ts")>= item[0])
        )
    else:
        result = result.union(
            df.filter(
            (F.col("ts")<= item[1]) &
            (F.col("ts")>= item[0])
            )
        )
...