Мне нравится писать функцию, которая обрабатывает перекос данных при объединении двух наборов данных Spark.
Решение для DataFrames простое:
def saltedJoin(left: DataFrame, right: DataFrame, e: Column, kind: String = "inner", replicas: Int): DataFrame = {
val saltedLeft = left.
withColumn("__temporarily__", typedLit((0 until replicas).toArray)).
withColumn("__skew_left__", explode($"__temporarily__")).
drop($"__temporarily__").
repartition($"__skew_left__")
val saltedRight = right.
withColumn("__temporarily__", rand).
withColumn("__skew_right__", ($"__temporarily__" * replicas).cast("bigint")).
drop("__temporarily__").
repartition($"__skew_right__")
saltedLeft.
join(saltedRight, $"__skew_left__" === $"__skew_right__" && e, kind).
drop($"__skew_left__").
drop($"__skew_right__")
}
И вы используете такую функцию:
val joined = saltedJoin(df alias "l", df alias "r", $"l.x" === $"r.x", replicas = 5)
Однако я не знаю, как написать функцию соединения для Dataset
экземпляров.Пока что я написал следующее:
def saltedJoinWith[A: Encoder : TypeTag, B: Encoder : TypeTag](left: Dataset[A],
right: Dataset[B],
e: Column,
kind: String = "inner",
replicas: Int): Dataset[(A, B)] = {
val spark = left.sparkSession
val random = new Random()
import spark.implicits._
val saltedLeft: Dataset[(A, Int)] = left flatMap (a => 0 until replicas map ((a, _)))
val saltedRight: Dataset[(B, Int)] = right map ((_, random.nextInt(replicas)))
saltedLeft.joinWith(saltedRight, saltedLeft("_2") === saltedRight("_2") && e, kind).map(x => (x._1._1, x._2._1))
}
Это, очевидно, неправильное решение, поскольку условие соединения e
не указывает на столбцы, определенные в saltedRight
и saltedLeft
.Он указывает на столбцы в saltedRight._1
и saltedLeft._1
.Так, например, val j = saltedJoinWith(ds alias "l", ds alias "r", $"l.x" === $"r.x", replicas = 5)
завершится с ошибкой во время выполнения со следующим исключением:
org.apache.spark.sql.AnalysisException: cannot resolve '`l.x`' given input columns: [_1, _2];;
Я использую Apache Spark 2.2.