У меня есть фрейм данных PySpark (скажем, df
), который представляет данные временного ряда с категориальными и числовыми атрибутами. Данные собираются каждые десять минут. Ниже приведен пример того, как может выглядеть фрейм данных:
+------+-----+-------------------+-----+
| name| type| timestamp|score|
+------+-----+-------------------+-----+
| name1|type1|2012-01-10 00:00:00| 11|
| name1|type1|2012-01-10 00:00:10| 14|
| name1|type1|2012-01-10 00:00:20| 2|
| name1|type1|2012-01-10 00:00:30| 3|
| name1|type1|2012-01-10 00:00:40| 55|
| name1|type1|2012-01-10 00:00:50| 10|
| name5|type1|2012-01-10 00:01:00| 5|
| name2|type2|2012-01-10 00:01:10| 8|
| name5|type1|2012-01-10 00:01:20| 1|
|name10|type1|2012-01-10 00:01:30| 12|
|name11|type3|2012-01-10 00:01:40| null|
+------+-----+-------------------+-----+
Для заданного имени и типа я хочу сгруппировать данные и агрегировать числовые атрибуты в этом фрейме данных для каждой недели, взяв квантильзначения (скажем, 0.8
), как мой метод агрегации. Учитывая, что в необработанном информационном кадре есть значения времени, выбранные для каждых десяти минут, в идеале следует ожидать 1008 значений оценки для каждого имени за каждую неделю. Но иногда у меня отсутствуют (или null
значение) данные для оценки. I f для любого заданного имени за любую неделю, число допустимых точек данных (ненулевые или отсутствующие значения score
) меньше определенного числа (скажем, 504), я хочу игнорировать это имя для данногонеделю в моей совокупности данных. Как я могу сделать это в PySpark?
Вот что я сейчас делаю.
from pyspark.sql import Window
import pyspark.sql.functions as F
agg_expr = F.expr("percentile_approx(score, 0.8)")
df = df.groupBy(
"name",
"type",
F.window("timestamp", "1 week")
.getField("start")
.alias("aggregate_ts"),
).agg(agg_expr.alias("score"))