Spark Dataset / Dataframe объединяют NULL ключ перекоса - PullRequest
0 голосов
/ 15 сентября 2018

Работая с объединениями Spark Dataset / DataFrame, я столкнулся с длительным запуском и не смог выполнить задания OOM.

Вот ввод:

  • ~ 10 наборов данных различного размера, в основном огромных (> 1 ТБ)
  • все соединены слева в один базовый набор данных
  • некоторые из ключей соединения: null

После некоторого анализа я обнаружил, что причиной неудачных и медленных заданий является null ключ перекоса: когда на левой стороне миллионы записей с ключом соединения null.

Я применил метод грубой силы для решения этой проблемы, и вот я хочу поделиться им.

Если у вас есть более качественные или встроенные решения (для обычного Apache Spark), пожалуйста, поделитесь им.

Ответы [ 2 ]

0 голосов
/ 25 ноября 2018

У меня была такая же проблема некоторое время назад, но я выбрал другой подход после некоторых тестов производительности. Это зависит от ваших данных, они подскажут вам, какой алгоритм лучше подходит для решения этой проблемы объединения.

В моем случае у меня более 30% данных с нулем в левой части соединения, и данные в формате паркета. Учитывая это, для меня лучше выполнить filter, где этот ключ имеет значение NULL и где этот ключ не равен NULL, присоединяться только тогда, когда он не равен NULL, и позже объединять оба данных.

val data = ...
val notJoinable = data.filter('keyToJoin.isNull)
val joinable = data.filter('keyToJoin.isNotNull)

joinable.join(...) union notJoinable

Он также избегает горячих точек. Если я использую ваш подход (отрицательные числа / все, что не является «присоединяемым» значением), spark перетасует все эти данные, которые представляют собой много данных (более 30%).

Просто пытаюсь показать вам другой подход к вашей проблеме,

0 голосов
/ 15 сентября 2018

Вот решение, к которому я пришел:

  /**
    * Expression that produce negative random between -1 and -`lowestValue`(inclusively).
    *
    * @example
    *          {{{
    *             spark
    *                  .range(1, 100)
    *                  .withColumn("negative", negativeRandomWithin(3))
    *                  .select("negative")
    *                  .distinct()
    *                  .show(false)
    *          }}}
    *          +--------+
    *          |negative|
    *          +--------+
    *          |-2      |
    *          |-3      |
    *          |-1      |
    *          +--------+
    */
  private[transformation] def negativeRandomWithin(lowestValue: Long): Column = {
    negate(positiveRandomWithin(lowestValue)) - 1
  }

  /**
    * Expression that produce positive random between 0 and `highestValue`(exclusively).
    *
    * @example
    *          {{{
    *             spark
    *                  .range(1, 100)
    *                  .withColumn("positive", positiveRandomWithin(3))
    *                  .select("positive")
    *                  .distinct()
    *                  .show(false)
    *          }}}
    *          +--------+
    *          |positive|
    *          +--------+
    *          |0       |
    *          |1       |
    *          |2       |
    *          +--------+
    */
  private[transformation] def positiveRandomWithin(highestValue: Long) = {
    pmod((rand * highestValue).cast(LongType), lit(highestValue))
  }

  implicit class SkewedDataFrameExt(val underlying: DataFrame) extends AnyVal {

    /**
      * Particular optimized version of left outer join where left side of join has skewed `null` field.
      *
      * @note
      *       It works only for single column join which is applicable for `isNotNull`.
      *
      * Optimization algorithm:
      *   1. replace left dataset `null` values with negative number within range between -1 and - `nullNumBuckets`(10000 by default)
      *   2. use appended column, with original join column value and `null` replacements, as join column from left dataset
      *      appended column name builds using original left join column and `skewedColumnPostFix` separated by underscore.
      *
      * @note there is no checks how many `null` values on left dataset before applying above steps,
      *       as well as there is no checks does it sort merge join or broadcast.
      *
      * IMPORTANT: If left dataset already has appended column name, it will be reused to benefit already repartitioned data on the left
      *
      * HIGHLY IMPORTANT: right dataset should not contain negative values in `joinRightCol`
      */
    private[transformation] def nullSkewLeftJoin(right: DataFrame,
                                                 joinLeftCol: Column,
                                                 joinRightCol: Column,
                                                 skewedColumnPostFix: String = "skewed_column",
                                                 nullNumBuckets: Int = 10000): DataFrame = {

      val skewedTempColumn = s"${joinLeftCol.toString()}_$skewedColumnPostFix"

      if (underlying.columns.exists(_ equalsIgnoreCase skewedTempColumn)) {
        underlying.join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
      } else {
        underlying
          .withColumn(skewedTempColumn,
                      when(joinLeftCol.isNotNull, joinLeftCol).otherwise(negativeRandomWithin(nullNumBuckets)))
          .join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
      }
    }
  }

Вкратце: я заменяю значения ключа левого набора данных null на отрицательный диапазон, чтобы сделать его равномерно распределенным.

ПРИМЕЧАНИЕ: это решение только для левого соединения и null перекос ключа соединения. Я не хотел взрывать правильный набор данных и делать перекос для любого ключа. Кроме того, после этого шага null значения ключей соединения будут распределены по разным разделам, следовательно, mapPartitions и т. Д. Не будут работать.

Как итог, вышеупомянутое решение помогло мне, но я хочу увидеть больше решений для этого типа проблем с объединением наборов данных.

...