Spark Scala: объединение двух таблиц и извлечение данных с максимальной датой (см. Описание) - PullRequest
0 голосов
/ 12 октября 2018

Я хочу объединить две tables A and B и выбрать для каждого значения записи с максимальной датой из table B.

Рассмотрим следующие таблицы:

Table A:

+---+-----+----------+
| id|Value|start_date|
+---+---- +----------+
| 1 |   a | 1/1/2018 |
| 2 |   a | 4/1/2018 |
| 3 |   a | 8/1/2018 |
| 4 |   c | 1/1/2018 |
| 5 |   d | 1/1/2018 |
| 6 |   e | 1/1/2018 |
+---+-----+----------+

Table B:

+---+-----+----------+
|Key|Value|sent_date |
+---+---- +----------+
| x |   a | 2/1/2018 |
| y |   a | 7/1/2018 |
| z |   a | 11/1/2018|
| p |   c | 5/1/2018 |
| q |   d | 5/1/2018 |
| r |   e | 5/1/2018 |
+---+-----+----------+

Цель состоит в том, чтобывведите в столбец id от Table A до Table B для каждого значения в Table B.Для этого таблицы A и B необходимо объединить со столбцом value, и для каждой записи в B, max(A.start_date) для всех данных в столбце Value в Table A находится с условиемA.start_date < B.sent_date

Давайте рассмотрим value=a здесь.В table A, мы можем видеть 3 записи для Value=a с 3 различными start_date.Таким образом, при объединении Table B, для value=a с sent_date=2/1/2018, берется запись с max(start_date) для start_date, которые меньше sent_date in Table B (в данном случае 01.01.2008) и соответствующие данные в столбце A.id вытягивается до Table B.

Аналогично для записи с value=a и sent_date = 11/1/2018 в Table B, id = 3 из таблицы A необходимо перенести в table B.

Результат должен быть следующим:

+---+-----+----------+---+
|Key|Value|sent_date |id |
+---+---- +----------+---+
| x |   a | 2/1/2018 | 1 |
| y |   a | 7/1/2018 | 2 |
| z |   a | 11/1/2018| 3 |
| p |   c | 5/1/2018 | 4 |
| q |   d | 5/1/2018 | 5 |
| r |   e | 5/1/2018 | 6 |
+---+-----+----------+---+

Я использую Spark 2.3.Я соединил две таблицы (используя Dataframe) и нашел max(start_date) в зависимости от условия.Но я не могу понять, как вытащить записи здесь.

Может ли кто-нибудь помочь мне здесь

Заранее спасибо !!

1 Ответ

0 голосов
/ 12 октября 2018

Я только что изменил дату «01.11.2008» на «01.09.2008», так как сортировка строк дает неверные результаты.При преобразовании в дату логика все равно будет работать.См. Ниже

scala> val df_a = Seq((1,"a","1/1/2018"),
     | (2,"a","4/1/2018"),
     | (3,"a","8/1/2018"),
     | (4,"c","1/1/2018"),
     | (5,"d","1/1/2018"),
     | (6,"e","1/1/2018")).toDF("id","value","start_date")
df_a: org.apache.spark.sql.DataFrame = [id: int, value: string ... 1 more field]

scala> val df_b = Seq(("x","a","2/1/2018"),
     | ("y","a","7/1/2018"),
     | ("z","a","9/1/2018"),
     | ("p","c","5/1/2018"),
     | ("q","d","5/1/2018"),
     | ("r","e","5/1/2018")).toDF("key","valueb","sent_date")
df_b: org.apache.spark.sql.DataFrame = [key: string, valueb: string ... 1 more field]

scala>  val df_join = df_b.join(df_a,'valueb==='valuea,"inner")
df_join: org.apache.spark.sql.DataFrame = [key: string, valueb: string ... 4 more fields]

scala> df_join.filter('sent_date >= 'start_date).withColumn("rank", rank().over(Window.partitionBy('key,'valueb,'sent_date).orderBy('start_date.desc))).filter('rank===1).drop("valuea","start_date","rank").show()
+---+------+---------+---+
|key|valueb|sent_date| id|
+---+------+---------+---+
|  q|     d| 5/1/2018|  5|
|  p|     c| 5/1/2018|  4|
|  r|     e| 5/1/2018|  6|
|  x|     a| 2/1/2018|  1|
|  y|     a| 7/1/2018|  2|
|  z|     a| 9/1/2018|  3|
+---+------+---------+---+


scala>

ОБНОВЛЕНИЕ

Ниже приведен udf для обработки строк даты в форматах MM / dd / yyyy

scala> def dateConv(x:String):String=
     | {
     | val y = x.split("/").map(_.toInt).map("%02d".format(_))
     | y(2)+"-"+y(0)+"-"+y(1)
     | }
dateConv: (x: String)String

scala>  val udfdateconv = udf( dateConv(_:String):String )
udfdateconv: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> val df_a_dt = df_a.withColumn("start_date",date_format(udfdateconv('start_date),"yyyy-MM-dd").cast("date"))
df_a_dt: org.apache.spark.sql.DataFrame = [id: int, valuea: string ... 1 more field]

scala> df_a_dt.printSchema
root
 |-- id: integer (nullable = false)
 |-- valuea: string (nullable = true)
 |-- start_date: date (nullable = true)


scala> df_a_dt.show()
+---+------+----------+
| id|valuea|start_date|
+---+------+----------+
|  1|     a|2018-01-01|
|  2|     a|2018-04-01|
|  3|     a|2018-08-01|
|  4|     c|2018-01-01|
|  5|     d|2018-01-01|
|  6|     e|2018-01-01|
+---+------+----------+


scala>
...