Вы можете посмотреть план выполнения искры, позвонив по номеру explain()
. Добавьте свой код, как показано ниже.
u_i = l_u.crossJoin(l_i)
print(u_i.explain())
u_i.count()
Ниже приведено объяснение плана без и с сохранением. Вызов join
в искре приводит к большим перебоям данных между исполнителями, что может привести к снижению производительности. Spark действительно пытается оптимизировать этот случайный порядок, выполняя трансляцию правого информационного кадра, если его размер ниже определенного порогового значения по умолчанию. Трансляция позволяет избежать случайного воспроизведения, поскольку все данные уже доступны для каждого исполнителя.
Когда вы сохраняете значение и считаете, фрейм данных рассчитывается заранее, и искра знает размер данных на правой стороне и может транслировать их, что позволяет избежать случайного перемешивания.
В случае отсутствия сохранения, датафрейм рассчитывается на лету и перетасовывается исполнителям, что приводит к задержке.
без сохранения:
== Physical Plan ==
CartesianProduct
:- SortMergeJoin [u_id#1092L], [u_id#1090L], LeftAnti
: :- *(1) Sort [u_id#1092L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(u_id#1092L, 200)
: : +- Scan ExistingRDD[u_id#1092L]
: +- *(2) Sort [u_id#1090L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(u_id#1090L, 200)
: +- Scan ExistingRDD[u_id#1090L]
+- SortMergeJoin [i_id#1094L], [i_id#1088L], LeftAnti
:- *(3) Sort [i_id#1094L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i_id#1094L, 200)
: +- Scan ExistingRDD[i_id#1094L]
+- *(4) Sort [i_id#1088L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i_id#1088L, 200)
+- Scan ExistingRDD[i_id#1088L]
с сохранением:
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Cross
:- *(1) InMemoryTableScan [u_id#1002L]
: +- InMemoryRelation [u_id#1002L], true, 10000, StorageLevel(disk, memory, 1 replicas)
: +- SortMergeJoin [u_id#1002L], [u_id#1000L], LeftAnti
: :- *(1) Sort [u_id#1002L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(u_id#1002L, 200)
: : +- Scan ExistingRDD[u_id#1002L]
: +- *(2) Sort [u_id#1000L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(u_id#1000L, 200)
: +- Scan ExistingRDD[u_id#1000L]
+- BroadcastExchange IdentityBroadcastMode
+- *(2) InMemoryTableScan [i_id#1004L]
+- InMemoryRelation [i_id#1004L], true, 10000, StorageLevel(disk, memory, 1 replicas)
+- SortMergeJoin [i_id#1004L], [i_id#998L], LeftAnti
:- *(1) Sort [i_id#1004L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(i_id#1004L, 200)
: +- Scan ExistingRDD[i_id#1004L]
+- *(2) Sort [i_id#998L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(i_id#998L, 200)
+- Scan ExistingRDD[i_id#998L]