Каковы альтернативные методы для панда квантиль и вырезать в pyspark 1.6 - PullRequest
0 голосов
/ 21 февраля 2019

Я новичок в pyspark.У меня есть код панды, как показано ниже.

bindt = df[df[var].notnull()][var].quantile([0,.1,.2,.3,.4,.5,.6,.7,.8,.9,1]).unique()

df['{0}_quartile'.format(var)] = pd.cut(df[var], bindt, labels=False, include_lowest=True )

В pyspark 2.x я нашел 'приблизительный набор', но я не нашел такого в pyspark 1.6.0

Myпример ввода :

df.show ()

+-----------+----------+---------------+--------------+------------------------+
|  id       | col_1    |col_2          |col_3         |col_4                   |
+-----------+----------+---------------+--------------+------------------------+
|1.10919E+16|3988487.35|     -236751.43|    -362208.07|                0.660000|
|1.10919E+16|3988487.35|     -236751.43|    -362208.07|                0.900000|
|1.10919E+16|3988487.35|     -236751.43|    -362208.07|                0.660000|
|1.10919E+16|  36718.55|           null|          null|                0.860000|
|1.10919E+16|  36718.55|           null|          null|                0.780000|
|1.10919E+16|  36718.55|           null|          null|                0.660000|
|1.10919E+16|  36718.55|           null|          null|                0.900000|
|1.10919E+16|  36718.55|           null|          null|                0.660000|

df.collect ()

[Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.080000')), 
Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.780000')), 
Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.780000')), 
Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.860000')), 
Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.330000'))]

Я должен выполнить цикл вышеупомянутой логики для всех входных данныхстолбцы.

for var in df.columns:
    bindt = df[df[var].notnull()][var].quantile([0,.1,.2,.3,.4,.5,.6,.7,.8,.9,1]).unique()    
    df['{0}_quartile'.format(var)] = pd.cut(df[var], bindt, labels=False, include_lowest=True )

Может кто-нибудь подсказать, как переписать приведенный выше код в фрейме pyspark 1.6.

Заранее спасибо

1 Ответ

0 голосов
/ 03 марта 2019

Если вы используете pyspark 2.x, вы можете использовать QuantileDiscretizer из ml lib, который использует приблизительноQuantile () и Bucketizer под капотом.

Однако, поскольку вы используете pyspark 1.6.x, вам необходимо:

1.Найти значения квантиля столбца

Значения квантиля можно найти двумя способами:

  1. Вычислить процентиль столбца с помощью вычисления процентов_ранка () и извлеките значения столбцов, значения процентилей которых близки к требуемому квантилю

  2. Следуйте методам этого ответа , который объясняет, как выполнять квантильные аппроксимациис pyspark <2.0.0 </p>

Вот мой пример реализации аппроксимирующих квантильных значений:

from pyspark.sql import functions as F
from pyspark.sql import Window

def compute_quantiles(df, col, quantiles):
  quantiles = sorted(quantiles)

  # 1. compute percentile
  df = df.withColumn("percentile", F.percent_rank().over(Window.orderBy(col)))

  # 2. categorize quantile based on the desired quantile and compute errors
  df = df.withColumn("percentile_cat1", F.lit(-1.0))
  df = df.withColumn("percentile_err1", F.lit(-1.0))
  df = df.withColumn("percentile_cat2", F.lit(-1.0))
  df = df.withColumn("percentile_err2", F.lit(-1.0))

  # check percentile with the lower boundaries
  for idx in range(0, len(quantiles)-1):
    q = quantiles[idx]
    df = df.withColumn("percentile_cat1", F\
                       .when( (F.col("percentile_cat1") == -1.0) & 
                             (F.col("percentile") <= q), q)\
                       .otherwise(F.col("percentile_cat1")))
    df = df.withColumn("percentile_err1", F\
                       .when( (F.col("percentile_err1") == -1.0) & 
                             (F.col("percentile") <= q), 
                             F.pow(F.col("percentile") - q, 2))\
                       .otherwise(F.col("percentile_err1")))

  # assign the remaining -1 values in the error to the largest squared error of 1
  df = df.withColumn("percentile_err1", F\
                     .when(F.col("percentile_err1") == -1.0, 1)\
                     .otherwise(F.col("percentile_err1")))

  # check percentile with the upper boundaries
  for idx in range(1, len(quantiles)):
    q = quantiles[idx]
    df = df.withColumn("percentile_cat2", F\
                       .when((F.col("percentile_cat2") == -1.0) & 
                             (F.col("percentile") <= q), q)\
                       .otherwise(F.col("percentile_cat2")))
    df = df.withColumn("percentile_err2",F\
                       .when((F.col("percentile_err2") == -1.0) & 
                             (F.col("percentile") <= q), 
                             F.pow(F.col("percentile") - q, 2))\
                       .otherwise(F.col("percentile_err2")))

  # assign the remaining -1 values in the error to the largest squared error of 1
  df = df.withColumn("percentile_err2", F\
                     .when(F.col("percentile_err2") == -1.0, 1)\
                     .otherwise(F.col("percentile_err2")))

  # select the nearest quantile to the percentile
  df = df.withColumn("percentile_cat", F\
                     .when(F.col("percentile_err1") < F.col("percentile_err2"), 
                           F.col("percentile_cat1"))\
                     .otherwise(F.col("percentile_cat2")))
  df = df.withColumn("percentile_err", F\
                     .when(F.col("percentile_err1") < F.col("percentile_err2"), 
                           F.col("percentile_err1"))\
                     .otherwise(F.col("percentile_err2")))

  # 3. approximate quantile values by choosing the value with the lowest error at each percentile category
  df = df.withColumn("approx_quantile", F\
                     .first(col).over(Window\
                                      .partitionBy("percentile_cat")\
                                      .orderBy(F.asc("percentile_err"))))

  return df

def extract_quantiles(df):
  df_quantiles = df.select("percentile_cat", "approx_quantile").distinct()
  rows = df_quantiles.collect()
  quantile_values = [ row.approx_quantile for row in rows ]

  return quantile_values

То, что я хотел достичь сверху, это вычисление процентиля каждой строкив столбце, и классифицировать его до ближайшего квантиля.Категоризация процентиля до ближайшего квантиля может быть выполнена путем выбора категории наименьшего квантиля, которая имеет наименьшую разницу (квадратичная ошибка) по отношению к процентилю.

1.Вычисление Percentile

Сначала я вычисляю процентиль столбца, используя cent_rank () , Оконную функцию в pyspark.Вы можете рассматривать Window как спецификацию раздела для ваших данных.Поскольку percent_rank() - это функция Window, вам нужно перейти в Window.

2.Категорируйте процентили к квантильным границам и вычисляйте ошибки

Ближайшая квантильная категория к процентилю может быть на ниже, равна или выше .Следовательно, мне нужно дважды вычислить ошибки: сначала сравнить процентиль с нижними квантильными оценками, а затем сравнить его с верхними квантильными оценками.Обратите внимание, что оператор ≤ используется для проверки, является ли процентиль меньше или равен границ.Зная прямые верхние и нижние границы квантиля процентиля, мы можем назначить процентиль ближайшей категории квантиля, выбрав либо квантиль категории ниже или равной, либо выше или равной, которая имеет наименьшую ошибку.

3.Приблизительные значения квантилей

Как только мы узнаем все ближайшие категории квантилей для каждого процентиля, мы можем затем приблизить значения квантиля: это значение имеет наименьшие ошибки в каждой категории квантиля.Эти приблизительные значения могут быть вычислены с использованием функции first() для каждого раздела категории с использованием Окно .Затем, чтобы извлечь значения квантилей, мы можем просто выбрать уникальные пары процентыCategory-приблизительноQuantileValue из фрейма данных.


После тестирования моих данных (~ 10000 строк) с помощью desired_quantiles = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] я обнаружил, что моиПример реализации довольно близок к approxQuantile результатам.Оба значения результата становятся еще ближе, так как я уменьшаю ошибку, указанную до approxQuantile.

Использование extract_quantiles(compute_quantile(df, col, quantiles)):

enter image description here

ИспользованиеapproxQuantile:

enter image description here

2.Используйте Bucketizer

Найдя значения квантилей, вы можете использовать Bucketizer pyspark для объединения значений, основанных на квантиле.Bucketizer доступен в pyspark 1.6.x [1] [2] и 2.x [3] [4]

Вот пример того, как вы можете выполнить группирование:

from pyspark.ml.feature import Bucketizer

bucketedData = df
desired_quantiles = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] # must be sorted

for col in df.columns:
  quantile_values = extract_quantiles(compute_quantiles(df, col, desired_quantiles))
  splits = [ boundary_values ] # replace this with quantile_values

  bucketizer = Bucketizer()\
    .setInputCol(col)\
    .setOutputCol("{}_quantile".format(col))\
    .setSplits(splits)
  bucketedData = bucketizer.transform(bucketedData)

Вы можете заменить value_boundaries значениями квантилей, найденными на шаге 1, или любым диапазоном разбиения сегментов, который вы пожелаете.Когда вы используете вешалку, весь диапазон значений столбца должен быть покрыт внутри разбиений .В противном случае значения за пределами указанных разделений будут рассматриваться как ошибки.Бесконечные значения, такие как -float("inf"), float("inf"), должны быть явно предоставлены, чтобы охватить все плавающие значения, если вы не уверены в границах значений ваших данных.

...