Настройка:
>>> d = [{'id': 0, 'value': 1},{'id': 1, 'value': 3},{'id': 2, 'value': 9}]
>>> df0 = spark.createDataFrame(d)
>>> df0.show()
+---+-----+
| id|value|
+---+-----+
| 0| 1|
| 1| 3|
| 2| 9|
+---+-----+
Шаг 1. Используйте функцию collect_list()
для создания массива всех значений в столбце value
и добавления этого массива в качестве столбца к исходному кадру данных
>>> from pyspark.sql.functions import *
>>> arr = df0.agg(collect_list(df.value).alias('arr_column'))
>>> df1 = df0.crossJoin(arr)
>>> df1.show()
+---+-----+-------------+
| id|value| arr_column|
+---+-----+-------------+
| 0| 1| [1, 3, 9]|
| 1| 3| [1, 3, 9]|
| 2| 9| [1, 3, 9]|
+---+-----+-------------+
Cross-join по сути транслирует массив всем исполнителям, так что следите за размером данных, к которому вы хотите применить его.(От вас также может потребоваться установить spark.sql.crossJoin.enabled=true
явно при создании контекста Spark, потому что Spark не любит перекрестные соединения именно по этой причине.)
Шаг 2: зарегистрируйте вашу функцию fu
как UDF Spark
>>> from pyspark.sql.types import *
>>> fu_udf = udf(fu, ArrayType(IntegerType()))
Шаг 3: Используйте эту UDF для увеличения элементов массива
>>> df3 = df1.withColumn('sums_in_column',fu_udf(df1.value,df1.arr_column))
>>> df3.show()
+---+-----+-------------+--------------+
| id|value| arr_column|sums_in_column|
+---+-----+-------------+--------------+
| 0| 1| [1, 3, 9]| [2, 4, 10]|
| 1| 3| [1, 3, 9]| [4, 6, 12]|
| 2| 9| [1, 3, 9]| [10, 12, 18]|
+---+-----+-------------+--------------+