Spark (2.2) Производительность <-> Spark Persist - PullRequest
0 голосов
/ 10 января 2019

У меня есть белый список (wl) пользователей и элементов, из которого я хотел бы выделить пользователей и элементы, занесенные в черный список (bl). Это делается с помощью левого анти-объединения. Оба полученных списка затем объединяются с помощью crossJoin

Проблема в том, что выполнение этого занимает вечность, даже для абсолютно минимального случая (в конечном итоге я получаю исключение нехватки памяти, даже для всего Spark Cluster) -> см. Прикрепленный код. Тем не менее, когда я делаю то же самое с помощью persist (), выполнение того же минимального случая занимает несколько секунд.

В частности:

from pyspark.sql import DataFrame, SparkSession
spark: SparkSession = SparkSession.builder.appName("dummy").getOrCreate()

# preparing dummy data
bl_i_data = [(20,), (30,), (60,)]
bl_i = spark.createDataFrame(bl_i_data, ["i_id"])
bl_u_data = [(1,), (3,), (6,)]
bl_u = spark.createDataFrame(bl_u_data, ["u_id"])
wl_u_data = [(1,), (2,), (3,), (4,), (5,)]
wl_u = spark.createDataFrame(wl_u_data, ["u_id"])
wl_i_data = [(20,), (30,), (40,), (50,), (60,)]
wl_i = spark.createDataFrame(wl_i_data, ["i_id"])

# combining wls and bls
l_u = wl_u.join(bl_u, on="u_id", how="left_anti")
l_i = wl_i.join(bl_i, on="i_id", how="left_anti")

# Takes forever to run:
u_i = l_u.crossJoin(l_i)
u_i.count()

# works fine if users and items get presisted first:
# l_u.persist()
# l_u.count()
# l_i.persist()
# l_i.count()
# u_i = l_u.crossJoin(l_i)
# u_i.count()

Есть ли у кого-нибудь хорошее объяснение относительно того, что именно происходит и / или видело такое поведение раньше? Я бы не хотел использовать persist (), так как не хочу сам управлять памятью.

1 Ответ

0 голосов
/ 12 января 2019

Вы можете посмотреть план выполнения искры, позвонив по номеру explain(). Добавьте свой код, как показано ниже.

u_i = l_u.crossJoin(l_i)
print(u_i.explain())
u_i.count()

Ниже приведено объяснение плана без и с сохранением. Вызов join в искре приводит к большим перебоям данных между исполнителями, что может привести к снижению производительности. Spark действительно пытается оптимизировать этот случайный порядок, выполняя трансляцию правого информационного кадра, если его размер ниже определенного порогового значения по умолчанию. Трансляция позволяет избежать случайного воспроизведения, поскольку все данные уже доступны для каждого исполнителя.

Когда вы сохраняете значение и считаете, фрейм данных рассчитывается заранее, и искра знает размер данных на правой стороне и может транслировать их, что позволяет избежать случайного перемешивания. В случае отсутствия сохранения, датафрейм рассчитывается на лету и перетасовывается исполнителям, что приводит к задержке.

без сохранения:

== Physical Plan ==
CartesianProduct
:- SortMergeJoin [u_id#1092L], [u_id#1090L], LeftAnti
:  :- *(1) Sort [u_id#1092L ASC NULLS FIRST], false, 0
:  :  +- Exchange hashpartitioning(u_id#1092L, 200)
:  :     +- Scan ExistingRDD[u_id#1092L]
:  +- *(2) Sort [u_id#1090L ASC NULLS FIRST], false, 0
:     +- Exchange hashpartitioning(u_id#1090L, 200)
:        +- Scan ExistingRDD[u_id#1090L]
+- SortMergeJoin [i_id#1094L], [i_id#1088L], LeftAnti
   :- *(3) Sort [i_id#1094L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(i_id#1094L, 200)
   :     +- Scan ExistingRDD[i_id#1094L]
   +- *(4) Sort [i_id#1088L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(i_id#1088L, 200)
         +- Scan ExistingRDD[i_id#1088L]   

с сохранением:

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, Cross
:- *(1) InMemoryTableScan [u_id#1002L]
:     +- InMemoryRelation [u_id#1002L], true, 10000, StorageLevel(disk, memory, 1 replicas)
:           +- SortMergeJoin [u_id#1002L], [u_id#1000L], LeftAnti
:              :- *(1) Sort [u_id#1002L ASC NULLS FIRST], false, 0
:              :  +- Exchange hashpartitioning(u_id#1002L, 200)
:              :     +- Scan ExistingRDD[u_id#1002L]
:              +- *(2) Sort [u_id#1000L ASC NULLS FIRST], false, 0
:                 +- Exchange hashpartitioning(u_id#1000L, 200)
:                    +- Scan ExistingRDD[u_id#1000L]
+- BroadcastExchange IdentityBroadcastMode
   +- *(2) InMemoryTableScan [i_id#1004L]
         +- InMemoryRelation [i_id#1004L], true, 10000, StorageLevel(disk, memory, 1 replicas)
               +- SortMergeJoin [i_id#1004L], [i_id#998L], LeftAnti
                  :- *(1) Sort [i_id#1004L ASC NULLS FIRST], false, 0
                  :  +- Exchange hashpartitioning(i_id#1004L, 200)
                  :     +- Scan ExistingRDD[i_id#1004L]
                  +- *(2) Sort [i_id#998L ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(i_id#998L, 200)
                        +- Scan ExistingRDD[i_id#998L]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...