Если вы используете pyspark 2.x, вы можете использовать QuantileDiscretizer из ml lib, который использует приблизительноQuantile () и Bucketizer под капотом.
Однако, поскольку вы используете pyspark 1.6.x, вам необходимо:
1.Найти значения квантиля столбца
Значения квантиля можно найти двумя способами:
Вычислить процентиль столбца с помощью вычисления процентов_ранка () и извлеките значения столбцов, значения процентилей которых близки к требуемому квантилю
Следуйте методам этого ответа , который объясняет, как выполнять квантильные аппроксимациис pyspark <2.0.0 </p>
Вот мой пример реализации аппроксимирующих квантильных значений:
from pyspark.sql import functions as F
from pyspark.sql import Window
def compute_quantiles(df, col, quantiles):
quantiles = sorted(quantiles)
# 1. compute percentile
df = df.withColumn("percentile", F.percent_rank().over(Window.orderBy(col)))
# 2. categorize quantile based on the desired quantile and compute errors
df = df.withColumn("percentile_cat1", F.lit(-1.0))
df = df.withColumn("percentile_err1", F.lit(-1.0))
df = df.withColumn("percentile_cat2", F.lit(-1.0))
df = df.withColumn("percentile_err2", F.lit(-1.0))
# check percentile with the lower boundaries
for idx in range(0, len(quantiles)-1):
q = quantiles[idx]
df = df.withColumn("percentile_cat1", F\
.when( (F.col("percentile_cat1") == -1.0) &
(F.col("percentile") <= q), q)\
.otherwise(F.col("percentile_cat1")))
df = df.withColumn("percentile_err1", F\
.when( (F.col("percentile_err1") == -1.0) &
(F.col("percentile") <= q),
F.pow(F.col("percentile") - q, 2))\
.otherwise(F.col("percentile_err1")))
# assign the remaining -1 values in the error to the largest squared error of 1
df = df.withColumn("percentile_err1", F\
.when(F.col("percentile_err1") == -1.0, 1)\
.otherwise(F.col("percentile_err1")))
# check percentile with the upper boundaries
for idx in range(1, len(quantiles)):
q = quantiles[idx]
df = df.withColumn("percentile_cat2", F\
.when((F.col("percentile_cat2") == -1.0) &
(F.col("percentile") <= q), q)\
.otherwise(F.col("percentile_cat2")))
df = df.withColumn("percentile_err2",F\
.when((F.col("percentile_err2") == -1.0) &
(F.col("percentile") <= q),
F.pow(F.col("percentile") - q, 2))\
.otherwise(F.col("percentile_err2")))
# assign the remaining -1 values in the error to the largest squared error of 1
df = df.withColumn("percentile_err2", F\
.when(F.col("percentile_err2") == -1.0, 1)\
.otherwise(F.col("percentile_err2")))
# select the nearest quantile to the percentile
df = df.withColumn("percentile_cat", F\
.when(F.col("percentile_err1") < F.col("percentile_err2"),
F.col("percentile_cat1"))\
.otherwise(F.col("percentile_cat2")))
df = df.withColumn("percentile_err", F\
.when(F.col("percentile_err1") < F.col("percentile_err2"),
F.col("percentile_err1"))\
.otherwise(F.col("percentile_err2")))
# 3. approximate quantile values by choosing the value with the lowest error at each percentile category
df = df.withColumn("approx_quantile", F\
.first(col).over(Window\
.partitionBy("percentile_cat")\
.orderBy(F.asc("percentile_err"))))
return df
def extract_quantiles(df):
df_quantiles = df.select("percentile_cat", "approx_quantile").distinct()
rows = df_quantiles.collect()
quantile_values = [ row.approx_quantile for row in rows ]
return quantile_values
То, что я хотел достичь сверху, это вычисление процентиля каждой строкив столбце, и классифицировать его до ближайшего квантиля.Категоризация процентиля до ближайшего квантиля может быть выполнена путем выбора категории наименьшего квантиля, которая имеет наименьшую разницу (квадратичная ошибка) по отношению к процентилю.
1.Вычисление Percentile Сначала я вычисляю процентиль столбца, используя cent_rank () , Оконную функцию в pyspark.Вы можете рассматривать Window как спецификацию раздела для ваших данных.Поскольку percent_rank()
- это функция Window, вам нужно перейти в Window.
2.Категорируйте процентили к квантильным границам и вычисляйте ошибки Ближайшая квантильная категория к процентилю может быть на ниже, равна или выше .Следовательно, мне нужно дважды вычислить ошибки: сначала сравнить процентиль с нижними квантильными оценками, а затем сравнить его с верхними квантильными оценками.Обратите внимание, что оператор ≤ используется для проверки, является ли процентиль меньше или равен границ.Зная прямые верхние и нижние границы квантиля процентиля, мы можем назначить процентиль ближайшей категории квантиля, выбрав либо квантиль категории ниже или равной, либо выше или равной, которая имеет наименьшую ошибку.
3.Приблизительные значения квантилей Как только мы узнаем все ближайшие категории квантилей для каждого процентиля, мы можем затем приблизить значения квантиля: это значение имеет наименьшие ошибки в каждой категории квантиля.Эти приблизительные значения могут быть вычислены с использованием функции first()
для каждого раздела категории с использованием Окно .Затем, чтобы извлечь значения квантилей, мы можем просто выбрать уникальные пары процентыCategory-приблизительноQuantileValue из фрейма данных.
После тестирования моих данных (~ 10000 строк) с помощью desired_quantiles = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
я обнаружил, что моиПример реализации довольно близок к approxQuantile
результатам.Оба значения результата становятся еще ближе, так как я уменьшаю ошибку, указанную до approxQuantile
.
Использование extract_quantiles(compute_quantile(df, col, quantiles))
:
ИспользованиеapproxQuantile
:
2.Используйте Bucketizer
Найдя значения квантилей, вы можете использовать Bucketizer pyspark для объединения значений, основанных на квантиле.Bucketizer доступен в pyspark 1.6.x [1] [2] и 2.x [3] [4]
Вот пример того, как вы можете выполнить группирование:
from pyspark.ml.feature import Bucketizer
bucketedData = df
desired_quantiles = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] # must be sorted
for col in df.columns:
quantile_values = extract_quantiles(compute_quantiles(df, col, desired_quantiles))
splits = [ boundary_values ] # replace this with quantile_values
bucketizer = Bucketizer()\
.setInputCol(col)\
.setOutputCol("{}_quantile".format(col))\
.setSplits(splits)
bucketedData = bucketizer.transform(bucketedData)
Вы можете заменить value_boundaries
значениями квантилей, найденными на шаге 1, или любым диапазоном разбиения сегментов, который вы пожелаете.Когда вы используете вешалку, весь диапазон значений столбца должен быть покрыт внутри разбиений .В противном случае значения за пределами указанных разделений будут рассматриваться как ошибки.Бесконечные значения, такие как -float("inf")
, float("inf")
, должны быть явно предоставлены, чтобы охватить все плавающие значения, если вы не уверены в границах значений ваших данных.