Как сделать объединение двух наборов данных в Apache Spark на основе времени? - PullRequest
0 голосов
/ 26 декабря 2018

Учитывая два набора данных S и R, оба со столбцом времени (t), как описано ниже:

//snapshot with id at t
case class S(id: String, t: Int)

//reference data at t
case class R(t: Int, fk: String)

//Example test case
val ss: Dataset[S] = Seq(S("a", 1), S("a", 3), S("b", 5), S("b", 7))
      .toDS

    val rs: Dataset[R] = Seq(R(0, "a"), R(2, "a"), R(6, "b"))
      .toDS

    val srs: Dataset[(S, Option[R])] = ss
      .asOfJoin(rs)

    srs.collect() must contain theSameElementsAs
      Seq((S("a", 1), Some(R(0, "a"))), (S("a", 3), Some(R(2, "a"))), (S("b", 5), None), (S("b", 7), Some(R(6, "b"))))

Цель состоит в том, чтобы найти самую последнюю строку в R, которая соответствует идентификатору E, если это возможно, т.е. R можетбыть необязательным в выходных данных.

asOfJoin определяется следующим образом:

  implicit class SOps(ss: Dataset[S]) {
    def asOfJoin(rs: Dataset[R])(implicit spark: SparkSession): Dataset[(S, Option[R])] = ???
  }

Одно из решений с использованием API набора данных выглядит следующим образом:

def asOfJoin(rs: Dataset[R])(implicit spark: SparkSession): Dataset[(S, Option[R])] = {
      import spark.implicits._

      ss
        .joinWith(
          rs,
          ss("id") === rs("fk") && ss("t") >= rs("t"),
          "left_outer")
       .map { case (l, r) => (l, Option(r)) }
       .groupByKey { case (s, _) => s }
       .reduceGroups { (x, y) =>
         (x, y) match {
           case ((_, Some(R(tx, _))), (_, Some(R(ty, _)))) => if (tx > ty) x else y
           case _ => x
         }
       }
       .map { case (_, r) => r }
}

Ответы [ 2 ]

0 голосов
/ 04 января 2019

Я взял комментарий @bupt_ljy о том, как избегать тэта-соединения, и, похоже, очень хорошо масштабируется:*

0 голосов
/ 26 декабря 2018

Я не уверен насчет размера набора данных S и набора данных R. Но из ваших кодов я вижу, что эффективность объединения (с неравными выражениями) плохая, и я могу дать некоторые предложения, основанные на разныхконкретные сценарии:

В наборе данных R или в наборе данных S не слишком много данных.

Я предлагаю, чтобы вы могли транслировать меньший набор данных и завершить бизнес-логику вискра udf с помощью широковещательной переменной.Таким образом, вам не нужен процесс shuffle (объединение), который помогает вам сэкономить много времени и ресурсов.

Для каждого уникального идентификатора count (отличный t) невелик.

Я полагаю, что вы можете выполнить предварительную агрегацию, сгруппировав id и collect_set (t) следующим образом:

select id,collect_set(t) as t_set from S 

Таким образом, вы можете удалить неравное выражение (ss ("t")> = rs ("t")) в соединении.И напишите свою бизнес-логику с двумя наборами t_sets из набора данных S и набора данных R.

Для других сценариев:

Я предлагаю вам оптимизировать коды с равным объединением иоконная функция.Поскольку я более знаком с SQL, я пишу здесь SQL, который можно преобразовать в API набора данных:

select
  sid,
  st,
  rt
from
(
    select 
      S.id as sid,
      S.t as st,
      R.t as rt,
      row_number() over (partition by S.id order by (S.t - NVL(R.t, 0)) rn
    from
      S
    left join R on S.id = R.fk) tbl
where tbl.rn = 1
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...