Что я хочу (очень упрощенно):
Набор входных данных до Набор выходных данных
Часть кода, который я пробовал:
def add_columns(cur_typ, target, value):
if cur_typ == target:
return value
return None
schema = T.StructType([T.StructField("name", T.StringType(), True),
T.StructField("typeT", T.StringType(), True),
T.StructField("value", T.IntegerType(), True)])
data = [("x", "a", 3), ("x", "b", 5), ("x", "c", 7), ("y", "a", 1), ("y", "b", 2),
("y", "c", 4), ("z", "a", 6), ("z", "b", 2), ("z", "c", 3)]
df = ctx.spark_session.createDataFrame(ctx.spark_session.sparkContext.parallelize(data), schema)
targets = [i.typeT for i in df.select("typeT").distinct().collect()]
add_columns = F.udf(add_columns)
w = Window.partitionBy('name')
for target in targets:
df = df.withColumn(target, F.max(F.lit(add_columns(df["typeT"], F.lit(target), df["value"]))).over(w))
df = df.drop("typeT", "value").dropDuplicates()
другая версия:
targets = df.select(F.collect_set("typeT").alias("typeT")).first()["typeT"]
w = Window.partitionBy('name')
for target in targets:
df = df.withColumn(target, F.max(F.lit(F.when(veh["typeT"] == F.lit(target), veh["value"])
.otherwise(None)).over(w)))
df = df.drop("typeT", "value").dropDuplicates()
Для небольших наборов данных оба работают, но у меня есть фрейм данных с 1 миллионом строк и 5000 различных типов типов. Таким образом, результатом должна быть таблица размером примерно 500 x 5000 (некоторые имена не имеют определенных typeT. Теперь я получаю ошибки переполнения стека (py4j.protocol.Py4JJavaError: Произошла ошибка при вызове o7624.withColumn.: java .lang.StackOverflowError ) пытаясь создать этот фрейм данных. Помимо увеличения размера стека, что я могу сделать? Есть ли лучший способ получить тот же результат?