Используя Spark SQL joinWith, как я могу объединить два набора данных, чтобы сопоставить текущие записи с их предыдущими записями на основе даты? - PullRequest
0 голосов
/ 16 января 2020

Я пытаюсь объединить два набора данных показаний счетчиков в Spark SQL, используя joinWith, так что возвращаемый тип - Набор данных [(Чтение, Чтение)]. Цель состоит в том, чтобы сопоставить каждую строку в первом наборе данных (называемом Текущим) с предыдущей записью во втором наборе данных (называемом Предыдущим) на основе столбца даты.

Мне нужно сначала присоединиться к ключу счетчика, а затем присоединитесь, сравнив дату, и найдя следующую наибольшую дату, которая меньше текущей даты чтения (то есть предыдущего чтения).

Вот то, что я пробовал, но я думаю, что это слишком тривиально. Я также получаю сообщение об ошибке «Не удается разрешить» с MAX.

val joined = Current.joinWith(
      Previous,
      (Current("Meter_Key") === Previous("Meter_Key"))
        && (Current("Reading_Dt_Key") > MAX(Previous("Reading_Dt_Key"))
    )

Кто-нибудь может помочь?

1 Ответ

0 голосов
/ 17 января 2020

Не пытался использовать LAG, думаю, что также будет работать. Но посмотрел на ваше требование с joinWith и решил применить логи c по соображениям производительности. Многие шаги в работе пропущены. Используются разные имена, вы можете абстрагироваться, переименовывать и отбрасывать столбцы.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

case class mtr0(mtr: String, seqNum: Int)
case class mtr(mtr: String, seqNum: Int, rank: Int)

// Gen data & optimize for JOINing, just interested in max 2 records for ranked sets.
val curr0 = Seq(
mtr0("m1", 1),
mtr0("m1", 2),
mtr0("m1", 3),
mtr0("m2", 7)
).toDS

val curr1 = curr0.withColumn("rank", row_number().over(Window.partitionBy($"mtr").orderBy($"seqNum".desc)))

// Reduce before JOIN.
val currF=curr1.filter($"rank" === 1 ).as[mtr]
//currF.show(false) 
val prevF=curr1.filter($"rank" === 2 ).as[mtr]
//prevF.show(false) 

val selfDF = currF.as("curr").joinWith(prevF.as("prev"),
( col("curr.mtr") === col("prev.mtr") && (col("curr.rank") === 1) && (col("prev.rank") === 2)),"left")

// Null value evident when only 1 entry per meter.
selfDF.show(false)

возвращает:

+----------+----------+
|_1        |_2        |
+----------+----------+
|[m1, 3, 1]|[m1, 2, 2]|
|[m2, 7, 1]|null      |
+----------+----------+

selfDF: org.apache.spark.sql.Dataset[(mtr, mtr)] = [_1: struct<mtr: string, seqNum: int ... 1 more field>, _2: struct<mtr: string, seqNum: int ... 1 more field>]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...