Spark повторного использования трансляции DF - PullRequest
2 голосов
/ 07 июля 2019

Я хотел бы повторно использовать мой DataFrame (не возвращаясь к этому с помощью функции «Карта» в RDD / Dataset), который я помечал как широковещательный, но, похоже, Spark продолжает передавать его снова и снова.

Наличие таблицы «банк» (тестовая таблица). Я выполняю следующее:

  val cachedDf = spark.sql("select * from bank").cache
  cachedDf.count

  val dfBroadcasted = broadcast(cachedDf)

  val dfNormal = spark.sql("select * from bank")

  dfNormal.join(dfBroadcasted, List("age"))
    .join(dfBroadcasted, List("age")).count

Я кеширую раньше на всякий случай, если это что-то изменило, но то же самое с или без.

Если я выполню приведенный выше код, я увижу следующий план SQL:

SQL plan for the code

Как вы можете видеть, мой вещаемый DF транслируется ДВАЖДЫ также с разными таймингами (если я добавлю больше действий потом, они тоже будут транслироваться снова).

Я забочусь об этом, потому что у меня на самом деле есть долго работающая программа, которая имеет «большой» DataFrame, который я могу использовать для фильтрации ОГРОМНЫХ DataFrames, и я бы хотел, чтобы этот «большой» DataFrame был повторно использован.

Есть ли способ принудительного повторного использования? (не только внутри одного и того же действия, но и между действиями, я мог бы выжить с одним и тем же действием)

Спасибо!

1 Ответ

2 голосов
/ 07 июля 2019

Хорошо, обновление вопроса.

Подводя итог: ВНУТРИ того же действия, left_semis будет повторно использовать трансляции, в то время как обычные / левые соединения не будут.Не уверен, что это связано с тем фактом, что Spark / разработчики уже знают, что столбцы этого DF вообще не влияют на вывод, поэтому они могут использовать его повторно, или просто отсутствует искра оптимизации.

Моя проблема в основном решена, , хотя было бы замечательно, если бы кто-то знал, как сохранить трансляцию между действиями.

Если я использую left_semi (который является объединениемя собираюсь использовать в моем реальном приложении), трансляция выполняется только один раз.

С:

    dfNormalxx.join(dfBroadcasted, Seq("age"),"left_semi")
.join(dfBroadcasted, Seq("age"),"left_semi").count

План становится (я также изменил размер, чтобы он соответствовал моему реальномуодин, но это не имело никакого значения):

enter image description here

Также общее время стены намного лучше, чем при использовании "left_semi" (я установил 1 исполнителя такон не распараллеливается, просто хотел проверить, действительно ли работа выполнялась дважды):

enter image description here

Несмотря на то, что мой сбор занимает 10 секунд,это ускорит чтение таблицы + groupBys, которые занимают 6-7 минут

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...