Я новичок в мире Spark и Scala.
Это мой пример входных данных:
val x= Seq(
(1116685002, "10/2015", "10/2015", 0, "N"),
(1116685011, "07/2015", "06/2015", 1116685012, "Y"),
(1116685012, "09/2015", "08/2015", 1116685002, "Y"),
(1116685003, "10/2015", "10/2015", 0, "N")
).toDF("PRVSN_COMMDTY_ID", "PRVSN_COMMDTY_LNCH_DT_TXT", "PROD_LNCH_DT_TXT", "XFER_TO_PRVSN_COMMDTY_ID", "XFER_FLG")
val y= Seq(
(1116685002, "20171015"),
(1116685011, "20170515"),
(1116685012, "20170915"),
(1116685003, "20170920"),
(1116685003, "20170918")
).toDF("PRVSN_COMMDTY_ID", "INVC_BUS_EFF_DT")
Соединение этих двух dfs:
val records = x.join(y,
x("PRVSN_COMMDTY_ID") === y("PRVSN_COMMDTY_ID"), joinType = "inner")
.groupBy(
x("PRVSN_COMMDTY_ID"),
x("PRVSN_COMMDTY_LNCH_DT_TXT"),
x("PROD_LNCH_DT_TXT"),
x("XFER_TO_PRVSN_COMMDTY_ID"),
x("XFER_FLG")
)
.agg(min(y("INVC_BUS_EFF_DT")))
.select(
col("PRVSN_COMMDTY_ID"),
col("PROD_LNCH_DT_TXT"),
col("PRVSN_COMMDTY_LNCH_DT_TXT"),
col("XFER_TO_PRVSN_COMMDTY_ID"),
col("XFER_FLG"),
col("min(INVC_BUS_EFF_DT)").alias("MIN_INVC_BUS_EFF_DT")
)
Присоединение к выводунаходится в этом изображении .
В этом кадре данных у нас есть набор родительских и дочерних отношений, мне нужно найти дочерние записи для конкретного родителя, в зависимости от XFER_TO_PRVSN_COMMDTY_ID
, родительское значение которого соответствует дочерним записям,
После определения дочерних записей для конкретного родителя мне нужно применить функцию min для получения самых ранних значений.
Ожидаемый результат
+----------------+----------------+-------------------------+-----------------------+--------+-------------------+
|PRVSN_COMMDTY_ID|PROD_LNCH_DT_TXT|PRVSN_COMMDTY_LNCH_DT_TXT|XFER_TO_PRVSN_COMMDY_ID|XFER_FLG|MIN_INVC_BUS_EFF_DT|
+----------------+----------------+-------------------------+-----------------------+--------+-------------------+
| 1116685002| 06/2015| 07/2015| 0| N| 20170515|
+----------------+----------------+-------------------------+-----------------------+--------+-------------------+