Как применить функцию к набору столбцов столбца данных PySpark по строкам? - PullRequest
0 голосов
/ 13 марта 2020

С учетом такого кадра данных:

   A0  A1  A2  A3
0   9   1   2   8
1   9   7   6   9
2   1   7   4   6
3   0   8   4   8
4   0   1   6   0
5   7   1   4   3
6   6   3   5   9
7   3   3   2   8
8   6   3   0   8
9   3   2   7   1

Мне нужно применить функцию к ряду столбцов строка за строкой, чтобы создать новый столбец с результатами этой функции.

Пример в Pandas:

df = pd.DataFrame(data=None, columns=['A0', 'A1', 'A2', 'A3'])
df['A0'] = np.random.randint(0, 10, 10)
df['A1'] = np.random.randint(0, 10, 10)
df['A2'] = np.random.randint(0, 10, 10)
df['A3'] = np.random.randint(0, 10, 10)

df['mean'] = df.mean(axis=1)
df['std'] = df.iloc[:, :-1].std(axis=1)
df['any'] = df.iloc[:, :-2].apply(lambda x: np.sum(x), axis=1)

И результат:

   A0  A1  A2  A3  mean       std  any
0   9   1   2   8  5.00  4.082483   20
1   9   7   6   9  7.75  1.500000   31
2   1   7   4   6  4.50  2.645751   18
3   0   8   4   8  5.00  3.829708   20
4   0   1   6   0  1.75  2.872281    7
5   7   1   4   3  3.75  2.500000   15
6   6   3   5   9  5.75  2.500000   23
7   3   3   2   8  4.00  2.708013   16
8   6   3   0   8  4.25  3.500000   17
9   3   2   7   1  3.25  2.629956   13

Как я могу сделать нечто подобное в PySpark?

Ответы [ 2 ]

3 голосов
/ 13 марта 2020

Для Spark 2.4+ вы можете использовать функцию aggregate. Сначала создайте столбцы массива values, используя все столбцы данных. Затем вычислите столбцы std, means и any следующим образом:

  • any: агрегат для суммирования элементов массива
  • mean: деление any столбец по размеру массива values
  • std: агрегат и сумма (x - mean) ** 2, а затем делим на length - 1 массива

Вот соответствующий код:

from pyspark.sql.functions import expr, sqrt, size, col, array

data = [
    (9, 1, 2, 8), (9, 7, 6, 9), (1, 7, 4, 6),
    (0, 8, 4, 8), (0, 1, 6, 0), (7, 1, 4, 3),
    (6, 3, 5, 9), (3, 3, 2, 8), (6, 3, 0, 8),
    (3, 2, 7, 1)
]
df = spark.createDataFrame(data, ['A0', 'A1', 'A2', 'A3'])

cols = df.columns

df.withColumn("values", array(*cols)) \
  .withColumn("any", expr("aggregate(values, 0D, (acc, x) -> acc + x)")) \
  .withColumn("mean", col("any") / size(col("values"))) \
  .withColumn("std", sqrt(expr("""aggregate(values, 0D, 
                                           (acc, x) -> acc + power(x - mean, 2), 
                                           acc -> acc / (size(values) -1))"""
                              )
                         )) \
  .drop("values") \
  .show(truncate=False)

#+---+---+---+---+----+----+------------------+
#|A0 |A1 |A2 |A3 |any |mean|std               |
#+---+---+---+---+----+----+------------------+
#|9  |1  |2  |8  |20.0|5.0 |4.08248290463863  |
#|9  |7  |6  |9  |31.0|7.75|1.5               |
#|1  |7  |4  |6  |18.0|4.5 |2.6457513110645907|
#|0  |8  |4  |8  |20.0|5.0 |3.8297084310253524|
#|0  |1  |6  |0  |7.0 |1.75|2.8722813232690143|
#|7  |1  |4  |3  |15.0|3.75|2.5               |
#|6  |3  |5  |9  |23.0|5.75|2.5               |
#|3  |3  |2  |8  |16.0|4.0 |2.70801280154532  |
#|6  |3  |0  |8  |17.0|4.25|3.5               |
#|3  |2  |7  |1  |13.0|3.25|2.6299556396765835|
#+---+---+---+---+----+----+------------------+

Spark <2,4 </strong>:

Вы можете использовать functools.reduce и operator.add для суммирования столбцов. Логика c остается такой же, как указано выше:

from functools import reduce
from operator import add

df.withColumn("any", reduce(add, [col(c) for c in cols])) \
  .withColumn("mean", col("any") / len(cols)) \
  .withColumn("std", sqrt(reduce(add, [(col(c) - col("mean")) ** 2 for c in cols]) / (len(cols) -1)))\
  .show(truncate=False)
1 голос
/ 15 марта 2020

Приведенный выше ответ является отличным, однако я вижу, что OP использует Python / PySpark, и если вы не понимаете выражений Spark SQL, вышеприведенные логики c не ясны на 100%.

I предложил бы использовать Pandas UDAF, в отличие от UDF, они векторизованы и очень эффективны. Это было добавлено в Spark API для снижения кривой обучения, необходимой для перехода с pandas на Spark. Это также означает, что ваш код более удобен в обслуживании, если большинство ваших коллег, таких как мой, более знакомы с Pandas / Python.

Это типы Pandas UDAF и их эквивалент Pandas

Например, SparkUdafType → df.pandasEquivalent (...) работает с → возвращает

SCALAR → df.transform (...), Серия отображений → Серии

GROUPED_MAP → df.apply (...), Group & MapDataFrame → DataFrame

GROUPED_AGG → df.aggregate (...), Уменьшить ряд → Скалярный

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...