Преамбула
Исходя из моего опыта, я считаю, что всякий раз, когда что-то может быть реализовано с использованием встроенных функций pyspark
, это предпочтительнее, чем пользовательская функция.
Одной из проблем udf является то, что сообщения об ошибках трудно расшифровать. Например, в вашем случае, я не знаю, почему вы встречаете эту ошибку.
pyspark.sql.functions
позволяют вам делать много вещей, если вы соглашаетесь делать это в нескольких шагах. Однако, с точки зрения производительности, это будет трудно победить, потому что эти функции оптимизированы экспертами. Если то, что вы хотите сделать, невозможно сделать с помощью pyspark.sql.functions
(это происходит), я предпочитаю использовать rdd
, чем udf
. rdd
более естественны для применения Python
функций. Вы теряете производительность по сравнению со встроенным методом DataFrame
, но получаете некоторую гибкость.
Возможно, пример вашей проблемы может быть поучительным.
Python
Давайте возьмем ваш пример на основе numpy. Вы дали реализацию python
:
import numpy as np
def lognormal_skew_numpy(v):
return (np.exp(v.std()) + 2) * np.sqrt(np.exp(v.std()) - 1)
. Она может использоваться для контроля согласованности других реализаций:
print(lognormal_skew_numpy(np.array([1,3,5])))
print(lognormal_skew_numpy(np.array([5,6])))
# 14.448897615797454
# 2.938798148174726
A DataFrame API
logi c
Теперь давайте получим Spark
. Я буду использовать следующие DataFrame
:
df = spark.createDataFrame([(1, 'a'), (3, 'a'), (5, 'a'), (5,'b'), (6,'b')], ['x','period'])
df.show(2)
+---+------+
| x|period|
+---+------+
| 1| a|
| 3| a|
+---+------+
only showing top 2 rows
Функция асимметрии выполняет только основные c математические операции. Все они реализованы в pyspark.sql.functions
, поэтому в этом случае не очень сложно создать функцию, которая делает это
import pyspark.sql.functions as psf
def lognormal_skew(df, xvar = 'x'):
df_agg = (df
.groupBy('period')
.agg(psf.stddev_pop(xvar).alias('sd'))
)
df_agg = df_agg.withColumn('skew', (psf.exp(psf.col('sd')) + 2)*psf.sqrt(psf.exp('sd') - 1))
return df_agg
Обратите внимание, что существуют различные функции для вычисления стандартного отклонения в psf
: я использую stddev_pop
, который менее эффективен, но сообщает о дисперсии уровня совокупности, а не оценщике (с 3 или 2 точками точность оценки будет весьма плохой).
Мы можем контролировать это, чтобы получить желаемый результат:
lognormal_skew(df).show(2)
+------+-----------------+------------------+
|period| sd| skew|
+------+-----------------+------------------+
| b| 0.5| 2.938798148174726|
| a|1.632993161855452|14.448897615797454|
+------+-----------------+------------------+
Нам удалось получить ожидаемый результат с чистыми DataFrame
логами c.
rdd
Давайте организуем данные, чтобы иметь rdd
, который выглядит как распараллеленные numpy массивы:
rdd = df.rdd
rdd = rdd.mapValues(lambda l: l).map(lambda l: (l[1], [l[0]] )).reduceByKey(lambda x,y: x + y)
rdd.take(2)
[('b', [5, 6]), ('a', [1, 3, 5])]
Здесь мы используем reduceByKey
для сгруппировать значения в список. На этом шаге, используя объемные данные, вы можете взорвать вашу оперативную память.
Наконец, вы можете легко выполнить свою функцию параллельно с этой структурой:
rdd = rdd.map(lambda l: (l[0], np.array(l[1]))).map(lambda l: (l[0], lognormal_skew_numpy(l[1])))
rdd.take(2)
[('b', 2.938798148174726), ('a', 14.448897615797454)]
У нас снова тот же результат , Я вижу два fl aws в этом подходе:
- Он менее читабелен и переносим. Если вы хотите повторно использовать код с другим набором данных, вам придется работать больше
- Это менее эффективно (скорость и память). Операция
reduceByKey
здесь является основным узким местом.
Однако вы получаете некоторую гибкость. Это компромисс.