Spark 1.6 DataFrame оптимизирует объединение разделов - PullRequest
0 голосов
/ 05 сентября 2018

У меня есть вопрос о разделении Spark DataFrame, в настоящее время я использую Spark 1.6 для требований проекта. Это мой фрагмент кода:

sqlContext.getConf("spark.sql.shuffle.partitions") // 6

val df = sc.parallelize(List(("A",1),("A",4),("A",2),("B",5),("C",2),("D",2),("E",2),("B",7),("C",9),("D",1))).toDF("id_1","val_1")
df.rdd.getNumPartitions // 4

val df2 = sc.parallelize(List(("B",1),("E",4),("H",2),("J",5),("C",2),("D",2),("F",2))).toDF("id_2","val_2")
df2.rdd.getNumPartitions // 4

val df3 = df.join(df2,$"id_1" === $"id_2")
df3.rdd.getNumPartitions // 6

val df4 = df3.repartition(3,$"id_1")
df4.rdd.getNumPartitions // 3

df4.explain(true)

Ниже приведен план объяснения:

== Parsed Logical Plan ==
'RepartitionByExpression ['id_1], Some(3)
+- Join Inner, Some((id_1#42 = id_2#46))
   :- Project [_1#40 AS id_1#42,_2#41 AS val_1#43]
   :  +- LogicalRDD [_1#40,_2#41], MapPartitionsRDD[169] at rddToDataFrameHolder at <console>:26
   +- Project [_1#44 AS id_2#46,_2#45 AS val_2#47]
      +- LogicalRDD [_1#44,_2#45], MapPartitionsRDD[173] at rddToDataFrameHolder at <console>:26

== Analyzed Logical Plan ==
id_1: string, val_1: int, id_2: string, val_2: int
RepartitionByExpression [id_1#42], Some(3)
+- Join Inner, Some((id_1#42 = id_2#46))
   :- Project [_1#40 AS id_1#42,_2#41 AS val_1#43]
   :  +- LogicalRDD [_1#40,_2#41], MapPartitionsRDD[169] at rddToDataFrameHolder at <console>:26
   +- Project [_1#44 AS id_2#46,_2#45 AS val_2#47]
      +- LogicalRDD [_1#44,_2#45], MapPartitionsRDD[173] at rddToDataFrameHolder at <console>:26

== Optimized Logical Plan ==
RepartitionByExpression [id_1#42], Some(3)
+- Join Inner, Some((id_1#42 = id_2#46))
   :- Project [_1#40 AS id_1#42,_2#41 AS val_1#43]
   :  +- LogicalRDD [_1#40,_2#41], MapPartitionsRDD[169] at rddToDataFrameHolder at <console>:26
   +- Project [_1#44 AS id_2#46,_2#45 AS val_2#47]
      +- LogicalRDD [_1#44,_2#45], MapPartitionsRDD[173] at rddToDataFrameHolder at <console>:26

== Physical Plan ==
TungstenExchange hashpartitioning(id_1#42,3), None
+- SortMergeJoin [id_1#42], [id_2#46]
   :- Sort [id_1#42 ASC], false, 0
   :  +- TungstenExchange hashpartitioning(id_1#42,6), None
   :     +- Project [_1#40 AS id_1#42,_2#41 AS val_1#43]
   :        +- Scan ExistingRDD[_1#40,_2#41] 
   +- Sort [id_2#46 ASC], false, 0
      +- TungstenExchange hashpartitioning(id_2#46,6), None
         +- Project [_1#44 AS id_2#46,_2#45 AS val_2#47]
            +- Scan ExistingRDD[_1#44,_2#45]

Насколько я знаю, DataFrame представляет интерфейс абстракции через RDD, поэтому разделение должно быть делегировано оптимизатору Catalyst .

Фактически по сравнению с RDD, где многие преобразования принимают параметр количества разделов, чтобы оптимизировать совместное разбиение и совместное размещение, когда это возможно, с DataFrame единственной возможностью изменить разбиение, вызывая перераспределение метода, в противном случае количество разделов для объединения и объединения определяется с использованием параметра конфигурации spark.sql.shuffle.partitions.

Из того, что я могу увидеть и понять из приведенного выше плана объяснения, кажется, что бесполезное перераспределение (так что, действительно, перемешивание) до 6 (значение по умолчанию) после повторного распределения до конечного значения, наложенного метод передела.

Я полагаю, что Оптимизатор может изменить количество разделов объединения до конечного значения 3.

Может ли кто-нибудь помочь мне прояснить этот момент? Может быть, я что-то упустил.

1 Ответ

0 голосов
/ 05 сентября 2018

Если вы используете spark sql, ваши разделы в случайном порядке всегда равны spark.sql.shufle.partitions.But, если вы включите этот spark.sql.adaptive.enabled, он добавит EchangeCoordinator. Прямо сейчас, работа этого координатора определить количество разделов после тасования для этапа, который должен извлекать данные тасования из одного или нескольких этапов.

...