У меня есть фрейм данных с датами, ID (скажем, для города) и двумя столбцами температур (в моем реальном фрейме данных у меня есть дюжина столбцов для вычисления).
Я хочу "оценить"эти температуры для данного окна. Я хочу, чтобы этот рейтинг масштабировался от 0 (самая низкая температура окна) до 100 (самая высокая температура для того же окна). Окно должно быть симметричным (то есть принимать во внимание столько дней, сколько дней). Мой тестовый фрейм данных выглядит так:
+-----------+-------+-----------------+-----------------+
|DATE_TICKET|ID_SITE|MAX_TEMPERATURE_C|MIN_TEMPERATURE_C|
+-----------+-------+-----------------+-----------------+
| 2017-03-24| 001| 22| 10|
| 2017-03-25| 001| 25| 15|
| 2017-03-26| 001| 31| 19|
| 2017-03-27| 001| 29| 18|
| 2017-03-28| 001| 30| 16|
| 2017-03-29| 001| 25| 17|
| 2017-03-30| 001| 24| 16|
| 2017-03-24| 002| 18| 12|
| 2017-03-25| 002| 17| 11|
| 2017-03-27| 002| 15| 7|
| 2017-03-28| 002| 12| 5|
| 2017-03-29| 002| 8| 3|
| 2017-03-30| 002| 10| 1|
| 2017-03-31| 002| 15| 4|
| 2017-03-24| 003| 18| 7|
| 2017-03-26| 003| 22| 11|
| 2017-03-27| 003| 27| 12|
| 2017-03-28| 003| 29| 15|
| 2017-04-01| 003| 31| 16|
| 2017-04-04| 003| 34| 22|
+-----------+-------+-----------------+-----------------+
Чтобы воссоздать мои данные, вы можете использовать этот код:
data = {'DATE_TICKET': ['2017-03-24','2017-03-25','2017-03-26','2017-03-27','2017-03-28','2017-03-29','2017-03-30',
'2017-03-24','2017-03-25','2017-03-27','2017-03-28','2017-03-29','2017-03-30','2017-03-31',
'2017-03-24','2017-03-26','2017-03-27','2017-03-28','2017-04-01','2017-04-04'],
'ID_SITE': ['001','001','001','001','001','001','001','002','002','002','002','002','002','002','003','003','003','003','003','003'],
'MAX_TEMPERATURE_C': [22,25,31,29,30,25,24,18,17,15,12,8,10,15,18,22,27,29,31,34],
'MIN_TEMPERATURE_C' : [10,15,19,18,16,17,16,12,11,7,5,3,1,4,7,11,12,15,16,22]}
df = pd.DataFrame(data)
ddf = ctx.createDataFrame(df)
ddf = ddf.withColumn('DATE_TICKET', ddf['DATE_TICKET'].cast('date'))
На данный момент мой код выглядит так:
import pandas as pd
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType
window_size = 2
target = int((window_size)-0.5)
w = Window.partitionBy("ID_SITE").orderBy("DATE_TICKET").rowsBetween(-(window_size), window_size)
median_udf = F.udf(lambda x: float(np.median(x)), FloatType())
rank_udf = F.udf(lambda x: pd.cut(x, 101, include_lowest=True, labels=list(range(0,101)))[target])
ddf.withColumn("list", F.collect_list("MAX_TEMPERATURE_C").over(w)) \
.withColumn("rolling_median", median_udf("list")).show(truncate = False)
Это работает с функцией 'median_udf' (кстати, я скопировал / вставил из другого поста в stackoverflow). Но эта функция не делает то, что я ожидаю.
Я хочу использовать функцию rank_udf, которая прекрасно работает, когда я применяю ее в одном списке. Он ранжирует все значения для данного окна и возвращает одно значение, то есть среднее, которое мне интересно.
Например:
data = [22,25,31,29,31,34,26,21]
target = int((len(data)/2)-0.5)
pd.cut(data, 101, include_lowest=True, labels=list(range(0,101)))[target]
Но:
- Прежде всего этот возвращает ошибку, когда я использую его как udf в pyspark.
- Даже если ошибки не было, я использую функцию Pandas и хочу бытьсмог сделать это без использования библиотеки pandas, потому что я работаю на сотнях миллионов строк и мне нужна производительность.
Я пытался использовать такие функции, как Bucketizer или QuantileDiscretizer из pyspark.ml.feature, но яне могу заставить их работать ...
(PS: да, я знаю, что это не очень процентиль, потому что я использую 101 лот вместо 100)
(PPS: Iотредактирую этот пост, если вам нужно больше контекста / информации).