динамически генерирует условие соединения в spark / scala - PullRequest
0 голосов
/ 09 мая 2018

Я хочу иметь возможность передать условие соединения для двух фреймов данных в качестве входной строки. Идея состоит в том, чтобы сделать объединение достаточно общим, чтобы пользователь мог выполнить условие, которое ему нравится.

Вот как я это делаю прямо сейчас. Хотя это работает, я думаю, что это не чисто.

val testInput =Array("a=b", "c=d")
val condition: Column = testInput.map(x => testMethod(x)).reduce((a,b) => a.and(b))
firstDataFrame.join(secondDataFrame, condition, "fullouter")

Вот метод теста

def testMethod(inputString: String): Column = {
  val splitted = inputString.split("=")
  col(splitted.apply(0)) === col(splitted.apply(1))
}

Нужна помощь в поиске лучшего способа ввода данных для динамического создания условия соединения

1 Ответ

0 голосов
/ 09 мая 2018

Не уверен, что подобный пользовательский метод принесет слишком много пользы, но если вы должны пойти по этому пути, я бы порекомендовал сделать так, чтобы он также охватывал join on:

  1. столбцы с тем же именем (что довольно часто встречается)
  2. условие неравенства

Пример кода ниже:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

def joinDFs(dfL: DataFrame, dfR: DataFrame, conditions: List[String], joinType: String) = {
  val joinConditions = conditions.map( cond => {
      val arr = cond.split("\\s+")
      if (arr.size != 3) throw new Exception("Invalid join conditions!") else
        arr(1) match {
          case "<"  => dfL(arr(0)) <   dfR(arr(2))
          case "<=" => dfL(arr(0)) <=  dfR(arr(2))
          case "="  => dfL(arr(0)) === dfR(arr(2))
          case ">=" => dfL(arr(0)) >=  dfR(arr(2))
          case ">"  => dfL(arr(0)) >   dfR(arr(2))
          case "!=" => dfL(arr(0)) =!= dfR(arr(2))
          case _ => throw new Exception("Invalid join conditions!")
        }
    } ).
    reduce(_ and _)

  dfL.join(dfR, joinConditions, joinType)
}

val dfLeft = Seq(
  (1, "2018-04-01", "p"),
  (1, "2018-04-01", "q"),
  (2, "2018-05-01", "r")
).toDF("id", "date", "value")

val dfRight = Seq(
  (1, "2018-04-15", "x"),
  (2, "2018-04-15", "y")
).toDF("id", "date", "value")

val conditions = List("id = id", "date <= date")

joinDFs(dfLeft, dfRight, conditions, "left_outer").
  show
// +---+----------+-----+----+----------+-----+
// | id|      date|value|  id|      date|value|
// +---+----------+-----+----+----------+-----+
// |  1|2018-04-01|    p|   1|2018-04-15|    x|
// |  1|2018-04-01|    q|   1|2018-04-15|    x|
// |  2|2018-05-01|    r|null|      null| null|
// +---+----------+-----+----+----------+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...