Использование numpy внутри pandas udf pyspark - PullRequest
1 голос
/ 20 апреля 2020

Я пытаюсь определить pandas udf, который бы вычислял перекос логнормального распределения за период.

В настоящее время я сделал следующее:

@pandas_udf("double", PandasUDFType.GROUPED_AGG)  
def lognormal_skew(v):
  return (np.exp(v.std()) + 2) * np.sqrt(np.exp(v.std()) - 1)

my_df.groupBy('period').agg(lognormal_skew(my_df['my_columns'])).show()

Однако я получаю ошибка:

rg.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3047.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3047.0 (TID 208812, 10.139.64.8, executor 82): org.apache.spark.api.python.PythonException: Traceback (most recent call last):

Я предполагаю, что это должно что-то делать с numpy, поскольку, если я пытаюсь определить перекос следующим образом:

@pandas_udf("double", PandasUDFType.GROUPED_AGG)  
def skew(v):
  return v.skew()

my_df.groupBy('period').agg(skew(my_df['my_columns'])).show()

Он выводит DataFrame и это не ошибка.

1 Ответ

2 голосов
/ 24 апреля 2020

Преамбула

Исходя из моего опыта, я считаю, что всякий раз, когда что-то может быть реализовано с использованием встроенных функций pyspark, это предпочтительнее, чем пользовательская функция.

Одной из проблем udf является то, что сообщения об ошибках трудно расшифровать. Например, в вашем случае, я не знаю, почему вы встречаете эту ошибку.

pyspark.sql.functions позволяют вам делать много вещей, если вы соглашаетесь делать это в нескольких шагах. Однако, с точки зрения производительности, это будет трудно победить, потому что эти функции оптимизированы экспертами. Если то, что вы хотите сделать, невозможно сделать с помощью pyspark.sql.functions (это происходит), я предпочитаю использовать rdd, чем udf. rdd более естественны для применения Python функций. Вы теряете производительность по сравнению со встроенным методом DataFrame, но получаете некоторую гибкость.

Возможно, пример вашей проблемы может быть поучительным.

Python

Давайте возьмем ваш пример на основе numpy. Вы дали реализацию python:

import numpy as np
def lognormal_skew_numpy(v):
    return (np.exp(v.std()) + 2) * np.sqrt(np.exp(v.std()) - 1)

. Она может использоваться для контроля согласованности других реализаций:

print(lognormal_skew_numpy(np.array([1,3,5])))
print(lognormal_skew_numpy(np.array([5,6])))
# 14.448897615797454
# 2.938798148174726

A DataFrame API logi c

Теперь давайте получим Spark. Я буду использовать следующие DataFrame:

df = spark.createDataFrame([(1, 'a'), (3, 'a'), (5, 'a'), (5,'b'), (6,'b')], ['x','period'])
df.show(2)

+---+------+
|  x|period|
+---+------+
|  1|     a|
|  3|     a|
+---+------+
only showing top 2 rows

Функция асимметрии выполняет только основные c математические операции. Все они реализованы в pyspark.sql.functions, поэтому в этом случае не очень сложно создать функцию, которая делает это

import pyspark.sql.functions as psf

def lognormal_skew(df, xvar = 'x'):
    df_agg = (df
              .groupBy('period')
              .agg(psf.stddev_pop(xvar).alias('sd'))
             )
    df_agg = df_agg.withColumn('skew', (psf.exp(psf.col('sd')) + 2)*psf.sqrt(psf.exp('sd') - 1))
    return df_agg

Обратите внимание, что существуют различные функции для вычисления стандартного отклонения в psf: я использую stddev_pop, который менее эффективен, но сообщает о дисперсии уровня совокупности, а не оценщике (с 3 или 2 точками точность оценки будет весьма плохой).

Мы можем контролировать это, чтобы получить желаемый результат:

lognormal_skew(df).show(2)
+------+-----------------+------------------+
|period|               sd|              skew|
+------+-----------------+------------------+
|     b|              0.5| 2.938798148174726|
|     a|1.632993161855452|14.448897615797454|
+------+-----------------+------------------+

Нам удалось получить ожидаемый результат с чистыми DataFrame логами c.

rdd

Давайте организуем данные, чтобы иметь rdd, который выглядит как распараллеленные numpy массивы:

rdd = df.rdd
rdd = rdd.mapValues(lambda l: l).map(lambda l: (l[1], [l[0]] )).reduceByKey(lambda x,y: x + y)
rdd.take(2)
[('b', [5, 6]), ('a', [1, 3, 5])]

Здесь мы используем reduceByKey для сгруппировать значения в список. На этом шаге, используя объемные данные, вы можете взорвать вашу оперативную память.

Наконец, вы можете легко выполнить свою функцию параллельно с этой структурой:

rdd = rdd.map(lambda l: (l[0], np.array(l[1]))).map(lambda l: (l[0], lognormal_skew_numpy(l[1])))
rdd.take(2)
[('b', 2.938798148174726), ('a', 14.448897615797454)]

У нас снова тот же результат , Я вижу два fl aws в этом подходе:

  • Он менее читабелен и переносим. Если вы хотите повторно использовать код с другим набором данных, вам придется работать больше
  • Это менее эффективно (скорость и память). Операция reduceByKey здесь является основным узким местом.

Однако вы получаете некоторую гибкость. Это компромисс.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...