Для Spark 2.4+ вы можете использовать функцию aggregate
. Сначала создайте столбцы массива values
, используя все столбцы данных. Затем вычислите столбцы std
, means
и any
следующим образом:
any
: агрегат для суммирования элементов массива mean
: деление any
столбец по размеру массива values
std
: агрегат и сумма (x - mean) ** 2
, а затем делим на length - 1
массива
Вот соответствующий код:
from pyspark.sql.functions import expr, sqrt, size, col, array
data = [
(9, 1, 2, 8), (9, 7, 6, 9), (1, 7, 4, 6),
(0, 8, 4, 8), (0, 1, 6, 0), (7, 1, 4, 3),
(6, 3, 5, 9), (3, 3, 2, 8), (6, 3, 0, 8),
(3, 2, 7, 1)
]
df = spark.createDataFrame(data, ['A0', 'A1', 'A2', 'A3'])
cols = df.columns
df.withColumn("values", array(*cols)) \
.withColumn("any", expr("aggregate(values, 0D, (acc, x) -> acc + x)")) \
.withColumn("mean", col("any") / size(col("values"))) \
.withColumn("std", sqrt(expr("""aggregate(values, 0D,
(acc, x) -> acc + power(x - mean, 2),
acc -> acc / (size(values) -1))"""
)
)) \
.drop("values") \
.show(truncate=False)
#+---+---+---+---+----+----+------------------+
#|A0 |A1 |A2 |A3 |any |mean|std |
#+---+---+---+---+----+----+------------------+
#|9 |1 |2 |8 |20.0|5.0 |4.08248290463863 |
#|9 |7 |6 |9 |31.0|7.75|1.5 |
#|1 |7 |4 |6 |18.0|4.5 |2.6457513110645907|
#|0 |8 |4 |8 |20.0|5.0 |3.8297084310253524|
#|0 |1 |6 |0 |7.0 |1.75|2.8722813232690143|
#|7 |1 |4 |3 |15.0|3.75|2.5 |
#|6 |3 |5 |9 |23.0|5.75|2.5 |
#|3 |3 |2 |8 |16.0|4.0 |2.70801280154532 |
#|6 |3 |0 |8 |17.0|4.25|3.5 |
#|3 |2 |7 |1 |13.0|3.25|2.6299556396765835|
#+---+---+---+---+----+----+------------------+
Spark <2,4 </strong>:
Вы можете использовать functools.reduce
и operator.add
для суммирования столбцов. Логика c остается такой же, как указано выше:
from functools import reduce
from operator import add
df.withColumn("any", reduce(add, [col(c) for c in cols])) \
.withColumn("mean", col("any") / len(cols)) \
.withColumn("std", sqrt(reduce(add, [(col(c) - col("mean")) ** 2 for c in cols]) / (len(cols) -1)))\
.show(truncate=False)