Scala Spark: обработка данных в соединениях с посолкой - PullRequest
0 голосов
/ 28 февраля 2019

Мне нравится писать функцию, которая обрабатывает перекос данных при объединении двух наборов данных Spark.

Решение для DataFrames простое:

def saltedJoin(left: DataFrame, right: DataFrame, e: Column, kind: String = "inner", replicas: Int): DataFrame = {
    val saltedLeft = left.
      withColumn("__temporarily__", typedLit((0 until replicas).toArray)).
      withColumn("__skew_left__", explode($"__temporarily__")).
      drop($"__temporarily__").
      repartition($"__skew_left__")

    val saltedRight = right.
      withColumn("__temporarily__", rand).
      withColumn("__skew_right__", ($"__temporarily__" * replicas).cast("bigint")).
      drop("__temporarily__").
      repartition($"__skew_right__")

    saltedLeft.
      join(saltedRight, $"__skew_left__" === $"__skew_right__" && e, kind).
      drop($"__skew_left__").
      drop($"__skew_right__")
  }

И вы используете такую ​​функцию:

val joined = saltedJoin(df alias "l", df alias "r", $"l.x" === $"r.x", replicas = 5)

Однако я не знаю, как написать функцию соединения для Dataset экземпляров.Пока что я написал следующее:

def saltedJoinWith[A: Encoder : TypeTag, B: Encoder : TypeTag](left: Dataset[A],
                                             right: Dataset[B],
                                             e: Column,
                                             kind: String = "inner",
                                             replicas: Int): Dataset[(A, B)] = {
    val spark = left.sparkSession
    val random = new Random()
    import spark.implicits._

    val saltedLeft: Dataset[(A, Int)] = left flatMap (a => 0 until replicas map ((a, _)))
    val saltedRight: Dataset[(B, Int)] = right map ((_, random.nextInt(replicas)))

    saltedLeft.joinWith(saltedRight, saltedLeft("_2") === saltedRight("_2") && e, kind).map(x => (x._1._1, x._2._1))
  }

Это, очевидно, неправильное решение, поскольку условие соединения e не указывает на столбцы, определенные в saltedRight и saltedLeft.Он указывает на столбцы в saltedRight._1 и saltedLeft._1.Так, например, val j = saltedJoinWith(ds alias "l", ds alias "r", $"l.x" === $"r.x", replicas = 5) завершится с ошибкой во время выполнения со следующим исключением:

org.apache.spark.sql.AnalysisException: cannot resolve '`l.x`' given input columns: [_1, _2];;

Я использую Apache Spark 2.2.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...