Я пытаюсь отфильтровать фрейм данных Pyspark на основе списка кортежей временных отметок [(start1, stop1), (start2, stop2), ...]
. Каждый кортеж представляет временное окно. Фрейм данных Pyspark в виде следующей структуры:
+-------------------+------+
| ts| var|
+-------------------+------+
|2018-09-01 20:10:00| 0|
|2018-09-01 20:12:00| 2|
|2018-09-01 20:13:00| 1|
|2018-09-01 20:17:00| 5|
+-------------------+------+
ts - это столбец меток времени, а var - это столбец интересующей переменной. Я ищу эффективный метод для фильтрации всех строк, которые не находятся в одном из временных окон. Например, если мой список временных окон состоит из одного окна [(datetime(2018, 9, 1, 20, 11), datetime(2018, 9, 1, 20, 14))]
, отфильтрованный кадр данных должен быть
+-------------------+------+
| ts| var|
+-------------------+------+
|2018-09-01 20:12:00| 2|
|2018-09-01 20:13:00| 1|
+-------------------+------+
Мне удалось придумать рабочий фрагмент кода, используя udf и цикл for, который повторяется для каждой строки во всех временных окнах (см. Код ниже). Однако зацикливание каждой строки во всех временных окнах происходит медленно.
Некоторая дополнительная информация:
- размеры и количество временных окон заранее неизвестны, т. Е. Жесткое кодирование невозможно
- фрейм данных Pyspark обычно состоит из нескольких миллионов строк
- количество временных окон обычно составляет 100-1000
Если бы кто-то мог указать на более эффективное решение, я был бы очень признателен.
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
import pandas as pd
from datetime import datetime
spark = SparkSession.builder.getOrCreate()
# create Pyspark dataframe
data = {'ts': [datetime(2018, 9, 1, 20, 10), datetime(2018, 9, 1, 20, 12),
datetime(2018, 9, 1, 20, 13), datetime(2018, 9, 1, 20, 17)],
'var': [0, 2, 1, 5]}
df = spark.createDataFrame(pd.DataFrame(data))
# list of windows [(start1, stop1), (start2, stop2), ...] for filtering
windows = [(datetime(2018, 9, 1, 20, 11), datetime(2018, 9, 1, 20, 14))]
# udf for filtering
def is_in_windows_udf(windows):
def _is_in_windows(t, windows):
for ts_l, ts_h in windows:
if ts_l <= t <= ts_h:
return True
return False
return udf(lambda t: _is_in_windows(t, windows), BooleanType())
# perform actual filtering operation
df.where(is_in_windows_udf(windows)(col("ts"))).show()