Вопрос о присоединении датафреймов в Spark - PullRequest
4 голосов
/ 18 марта 2019

Предположим, у меня есть два секционированных фрейма данных:

df1 = spark.createDataFrame(
    [(x,x,x) for x in range(5)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')

df2 = spark.createDataFrame(
    [(x,x,x) for x in range(7)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')

(сценарий 1) Если я присоединяю их с помощью [key1, key2], операция соединения выполняется внутри каждого раздела без перемешивания (числоразделов в результирующем фрейме данных одинаков):

x = df1.join(df2, on=['key1', 'key2'], how='left')
assert x.rdd.getNumPartitions() == 3

(сценарий 2) Но если я объединю их с помощью [key1, key2, time], произойдет случайное перемешивание (количество разделовв результате dataframe равен 200, который управляется опцией spark.sql.shuffle.partitions):

x = df1.join(df2, on=['key1', 'key2', 'time'], how='left')
assert x.rdd.getNumPartitions() == 200

В то же время групповые и оконные операции с помощью [key1, key2, time] сохраняют количество разделов и выполняютбез перемешивания:

x = df1.groupBy('key1', 'key2', 'time').agg(F.count('*'))
assert x.rdd.getNumPartitions() == 3

Я не могу понять, является ли это ошибкой или есть некоторые причины для выполнения операции перемешивания во втором сценарии?И как мне избежать тасования, если это возможно?

Ответы [ 2 ]

1 голос
/ 19 марта 2019

Полагаю, удалось выяснить причину разного результата в Python и Scala.

Причина в оптимизации вещания.Если spark-shell запускается с отключенной трансляцией, и Python, и Scala работают одинаково.

./spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1

val df1 = Seq(
  (1, 1, 1)
).toDF("key1", "key2", "time").repartition(3, col("key1"), col("key2"))

val df2 = Seq(
  (1, 1, 1),
  (2, 2, 2)
).toDF("key1", "key2", "time").repartition(3, col("key1"), col("key2"))

val x = df1.join(df2, usingColumns = Seq("key1", "key2", "time"))

x.rdd.getNumPartitions == 200

Так что похоже, что spark 2.4.0 не может оптимизировать описанный случай из коробки, и расширение оптимизатора катализатора, необходимое какпредложено @ user10938362.

Кстати.Вот информация о написании расширений оптимизатора катализатора https://developer.ibm.com/code/2017/11/30/learn-extension-points-apache-spark-extend-spark-catalyst-optimizer/

1 голос
/ 19 марта 2019

Поведение Catalyst Optimizer различается для pyspark и Scala (по крайней мере, для Spark 2.4).

Я запустил оба и получил два разных плана.

В действительности вы получаете 200 разделов в pyspark,если вы явно не укажете для pyspark:

 spark.conf.set("spark.sql.shuffle.partitions", 3)

Затем обрабатываются 3 раздела, и, таким образом, 3 остаются в pyspark.

Немного удивлен, поскольку я думал бы, что под капотом это будет обычным делом.Так что люди продолжают говорить мне.Это просто показывает.

Физический план для pyspark с параметром, установленным через conf:

== Physical Plan ==
*(5) Project [key1#344L, key2#345L, time#346L]
+- SortMergeJoin [key1#344L, key2#345L, time#346L], [key1#350L, key2#351L, time#352L], LeftOuter
   :- *(2) Sort [key1#344L ASC NULLS FIRST, key2#345L ASC NULLS FIRST, time#346L ASC NULLS FIRST], false, 0
    :  +- Exchange hashpartitioning(key1#344L, key2#345L, time#346L, 3)
    :     +- *(1) Scan ExistingRDD[key1#344L,key2#345L,time#346L]
    +- *(4) Sort [key1#350L ASC NULLS FIRST, key2#351L ASC NULLS FIRST, time#352L ASC NULLS FIRST], false, 0
       +- Exchange hashpartitioning(key1#350L, key2#351L, time#352L, 3)
         +- *(3) Filter ((isnotnull(key1#350L) && isnotnull(key2#351L)) && isnotnull(time#352L))
             +- *(3) Scan ExistingRDD[key1#350L,key2#351L,time#352L]
...