Вот решение, к которому я пришел:
/**
* Expression that produce negative random between -1 and -`lowestValue`(inclusively).
*
* @example
* {{{
* spark
* .range(1, 100)
* .withColumn("negative", negativeRandomWithin(3))
* .select("negative")
* .distinct()
* .show(false)
* }}}
* +--------+
* |negative|
* +--------+
* |-2 |
* |-3 |
* |-1 |
* +--------+
*/
private[transformation] def negativeRandomWithin(lowestValue: Long): Column = {
negate(positiveRandomWithin(lowestValue)) - 1
}
/**
* Expression that produce positive random between 0 and `highestValue`(exclusively).
*
* @example
* {{{
* spark
* .range(1, 100)
* .withColumn("positive", positiveRandomWithin(3))
* .select("positive")
* .distinct()
* .show(false)
* }}}
* +--------+
* |positive|
* +--------+
* |0 |
* |1 |
* |2 |
* +--------+
*/
private[transformation] def positiveRandomWithin(highestValue: Long) = {
pmod((rand * highestValue).cast(LongType), lit(highestValue))
}
implicit class SkewedDataFrameExt(val underlying: DataFrame) extends AnyVal {
/**
* Particular optimized version of left outer join where left side of join has skewed `null` field.
*
* @note
* It works only for single column join which is applicable for `isNotNull`.
*
* Optimization algorithm:
* 1. replace left dataset `null` values with negative number within range between -1 and - `nullNumBuckets`(10000 by default)
* 2. use appended column, with original join column value and `null` replacements, as join column from left dataset
* appended column name builds using original left join column and `skewedColumnPostFix` separated by underscore.
*
* @note there is no checks how many `null` values on left dataset before applying above steps,
* as well as there is no checks does it sort merge join or broadcast.
*
* IMPORTANT: If left dataset already has appended column name, it will be reused to benefit already repartitioned data on the left
*
* HIGHLY IMPORTANT: right dataset should not contain negative values in `joinRightCol`
*/
private[transformation] def nullSkewLeftJoin(right: DataFrame,
joinLeftCol: Column,
joinRightCol: Column,
skewedColumnPostFix: String = "skewed_column",
nullNumBuckets: Int = 10000): DataFrame = {
val skewedTempColumn = s"${joinLeftCol.toString()}_$skewedColumnPostFix"
if (underlying.columns.exists(_ equalsIgnoreCase skewedTempColumn)) {
underlying.join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
} else {
underlying
.withColumn(skewedTempColumn,
when(joinLeftCol.isNotNull, joinLeftCol).otherwise(negativeRandomWithin(nullNumBuckets)))
.join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
}
}
}
Вкратце: я заменяю значения ключа левого набора данных null
на отрицательный диапазон, чтобы сделать его равномерно распределенным.
ПРИМЕЧАНИЕ: это решение только для левого соединения и null
перекос ключа соединения. Я не хотел взрывать правильный набор данных и делать перекос для любого ключа. Кроме того, после этого шага null
значения ключей соединения будут распределены по разным разделам, следовательно, mapPartitions
и т. Д. Не будут работать.
Как итог, вышеупомянутое решение помогло мне, но я хочу увидеть больше решений для этого типа проблем с объединением наборов данных.