Здесь приведены примеры статических и динамических подходов с использованием переменной broadcast
( переменная только для чтения сохраняется в каждой памяти исполнителя; избегает передачи копии спискана машине-драйвере для каждой распределенной задачи ), чтобы получить отдельные значения столбца.Кроме того, если вы не предоставите жестко запрограммированное значение во время pivot
, оно вызовет дополнительное задание (широкое преобразование в случайном порядке), чтобы получить различные значения для этого столбца.
Отказ от ответственности => может быть лучшая альтернатива для динамического подхода с точки зрения производительности
print(spark.version)
2.4.3
import pyspark.sql.functions as F
# sample data
rawData = [(1, "a"),
(1, "b"),
(1, "c"),
(2, "a"),
(2, "b"),
(2, "c"),
(3, "a"),
(3, "b"),
(3, "c")]
df = spark.createDataFrame(rawData).toDF("id","value")
# static list example
l = ["a", "b", "c"]
l = spark.sparkContext.broadcast(l)
pivot_static_df = df\
.groupby("id")\
.pivot("value", l.value)\
.agg(F.expr("first(value)"))
pivot_static_df.show()
+---+---+---+---+
| id| a| b| c|
+---+---+---+---+
| 1| a| b| c|
| 3| a| b| c|
| 2| a| b| c|
+---+---+---+---+
# dynamic list example
v = df.select("value").distinct().rdd.flatMap(lambda x: x).collect()
v = spark.sparkContext.broadcast(v)
print(v.value)
pivot_dynamic_df = df\
.groupby("id")\
.pivot("value", l.value)\
.agg(F.expr("first(value)"))
pivot_dynamic_df.show()
['c', 'b', 'a']
+---+---+---+---+
| id| a| b| c|
+---+---+---+---+
| 1| a| b| c|
| 3| a| b| c|
| 2| a| b| c|
+---+---+---+---+