Как улучшить искровую производительность для большого метода codegen, который включает в себя много полей? - PullRequest
0 голосов
/ 12 июня 2019

Я хотел бы сделать сумму агрегации массива с размером массива 100, вот моя грубая сила. (Я знаю, что могу написать скалярный udaf, но я бы хотел довести кодекс искры до предела.

Spark v2.4

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession \
    .builder \
    .master('local[10]') \
    .appName('Notebook') \
    .config('spark.sql.codegen.maxFields', '10000') \
    .config('spark.sql.codegen.methodSplitThreshold', '100000') \
    .getOrCreate()

df = spark.range(2969622).select(F.array(*[F.rand() for i in range(100)]).alias('v')).cache()
df.count()
# Trigger cache.

# The followings are equivalent to
# SELECT
#     ARRAY(SUM(v[0]), SUM(v[1]),...) as v
# FROM ...

df.agg(
    F.array(*[F.sum(F.col('v')[i]) for i in range(0, 30)]).alias('v'),
).show()
# 357 ms

df.agg(
    F.array(*[F.sum(F.col('v')[i]) for i in range(0, 40)]).alias('v'),
).show()
# 5.51 sec

Кажется, есть такой порог, что, как только я его достигаю, производительность сильно ухудшается.

Итак, я закончил этим

a = df.agg(
    *[F.sum(F.col("v")[i]) for i in range(0, 25)],
)
b = df.agg(
    *[F.sum(F.col("v")[i]) for i in range(25, 50)],
)
c = df.agg(
    *[F.sum(F.col("v")[i]) for i in range(50, 75)],
)
d = df.agg(
    *[F.sum(F.col("v")[i]) for i in range(75, 100)],
)
x = a.crossJoin(b).crossJoin(c).crossJoin(d).selectExpr('array(*) as v').toPandas()
# 2.4 sec

Интересно, есть ли в spark конфиг, управляющий оптимизацией spark codegen в этом случае? Возможно, я смогу немного расслабиться, чтобы улучшить производительность в этом конкретном примере.

Я безуспешно пытался spark.sql.codegen.maxFields & spark.sql.codegen.methodSplitThreshold.

...