Для Spark 2.4+ вы можете использовать функцию агрегат и выполнять вычисления за один шаг:
from pyspark.sql.functions import expr
# I adjusted the 2nd array-item in id=1 from 2.0 to 2.1 so there is no `2.0` when id=1
df = spark.createDataFrame([(1,[0.2, 2.1, 3., 4., 3., 0.5]),(2,[7., 0.3, 0.3, 8., 2.,])],['id','column'])
df.withColumn('data', expr("""
/* ArrayType argument */
/* zero: set empty array to initialize acc */
/* merge: iterate through `column` and reduce based on the values of y and the array indices of acc */
(acc, y) ->
WHEN y < 2.0 THEN array(IFNULL(acc[0],0) + y, acc[1], acc[2])
WHEN y > 2.0 THEN array(acc[0], IFNULL(acc[1],0) + y, acc[2])
ELSE array(acc[0], acc[1], IFNULL(acc[2],0) + y)
/* finish: to convert the array into a named_struct */
acc -> (acc[0] as `column<2`, acc[1] as `column>2`, acc[2] as `column=2`)
""")).selectExpr('id', 'data.*').show()
#| id|column<2|column>2|column=2|
#| 1| 0.7| 12.1| null|
#| 2| 0.6| 15.0| 2.0|
До Spark 2.4 функциональная поддержка ArrayType ограничена, вы может сделать это с помощью explode и затем groupby + pivot:
from pyspark.sql.functions import sum as fsum, expr
df.selectExpr('id', 'explode_outer(column) as item') \
.withColumn('g', expr('if(item < 2, "column<2", if(item > 2, "column>2", "column=2"))')) \
.groupby('id') \
.pivot('g', ["column<2", "column>2", "column=2"]) \
.agg(fsum('item')) \
#| id|column<2|column>2|column=2|
#| 1| 0.7| 12.1| null|
#| 2| 0.6| 15.0| 2.0|
В случае, если explode
медленный (т. е. SPARK-21657 , показанный до Spark 2.3), используйте UDF:
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, DoubleType
schema = StructType([
StructField("column>2", DoubleType()),
StructField("column<2", DoubleType()),
StructField("column=2", DoubleType())
def split_data(arr):
d = {}
if arr is None: arr = []
for y in arr:
if y > 2:
d['column>2'] = d.get('column>2',0) + y
elif y < 2:
d['column<2'] = d.get('column<2',0) + y
d['column=2'] = d.get('column=2',0) + y
return d
udf_split_data = udf(split_data, schema)
df.withColumn('data', udf_split_data('column')).selectExpr('id', 'data.*').show()