Предположим, у меня есть два секционированных фрейма данных:
df1 = spark.createDataFrame(
[(x,x,x) for x in range(5)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')
df2 = spark.createDataFrame(
[(x,x,x) for x in range(7)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')
(сценарий 1) Если я присоединяю их с помощью [key1, key2], операция соединения выполняется внутри каждого раздела без перемешивания (числоразделов в результирующем фрейме данных одинаков):
x = df1.join(df2, on=['key1', 'key2'], how='left')
assert x.rdd.getNumPartitions() == 3
(сценарий 2) Но если я объединю их с помощью [key1, key2, time], произойдет случайное перемешивание (количество разделовв результате dataframe равен 200, который управляется опцией spark.sql.shuffle.partitions):
x = df1.join(df2, on=['key1', 'key2', 'time'], how='left')
assert x.rdd.getNumPartitions() == 200
В то же время групповые и оконные операции с помощью [key1, key2, time] сохраняют количество разделов и выполняютбез перемешивания:
x = df1.groupBy('key1', 'key2', 'time').agg(F.count('*'))
assert x.rdd.getNumPartitions() == 3
Я не могу понять, является ли это ошибкой или есть некоторые причины для выполнения операции перемешивания во втором сценарии?И как мне избежать тасования, если это возможно?