Вот мой фрейм данных 1.Я отфильтровываю последние данные на основе отметки времени на ("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "UpdateReason_updateReasonId")
Здесь я сортирую по 6 столбцам.
uniqueFundamentalSet PeriodId SourceId StatementTypeCode StatementCurrencyId UpdateReason_updateReasonId UpdateReasonComment UpdateReasonComment_languageId UpdateReasonEnumerationId FFAction|!| DataPartition PartitionYear TimeStamp
192730230775 297 182 INC 500186 null null null null O|!| Japan 2017 2018-05-10T10:11:15+00:00
192730230775 297 181 INC 500186 1 UpdateReason2UpdateIsNowUPdated 505074 3019680 I|!| Japan 2017 2018-05-10T10:08:01+00:00
192730230775 297 181 INC 500186 4 New Reason Added 505074 3019683 I|!| Japan 2017 2018-05-10T10:08:01+00:00
192730230775 297 180 INC 500186 6 InsertUpdateReason 505074 3019685 I|!| Japan 2017 2018-05-10T09:57:29+00:00
192730230775 297 181 INC 500186 1 UpdateReason2Update 505074 3019680 I|!| Japan 2017 2018-05-10T09:57:29+00:00
192730230775 297 182 INC 500186 6 UpdateReasonToDelete 505074 3019685 I|!| Japan 2017 2018-05-10T09:57:29+00:00
192730230775 297 180 INC 500186 6 InsertUpdateReason 505074 3019685 I|!| Japan 2017 2018-05-10T10:00:40+00:00
192730230775 297 181 INC 500186 1 UpdateReason2Update 505074 3019680 I|!| Japan 2017 2018-05-10T10:00:40+00:00
192730230775 297 182 INC 500186 6 UpdateReasonToDelete 505074 3019685 I|!| Japan 2017 2018-05-10T10:00:40+00:00
Вот код для этого
val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "UpdateReason_updateReasonId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey1 = tempReorder.withColumn("rank", row_number().over(windowSpec))
.filter($"rank" === 1)).drop("rank")
И это дает мне вывод ниже.
uniqueFundamentalSet PeriodId SourceId StatementTypeCode StatementCurrencyId UpdateReason_updateReasonId UpdateReasonComment UpdateReasonComment_languageId UpdateReasonEnumerationId FFAction|!| DataPartition PartitionYear TimeStamp
192730230775 297 180 INC 500186 6 InsertUpdateReason 505074 3019685 I|!| Japan 2017 2018-05-10T10:00:40+00:00
192730230775 297 182 INC 500186 null null null null O|!| Japan 2017 2018-05-10T10:11:15+00:00
192730230775 297 182 INC 500186 6 UpdateReasonToDelete 505074 3019685 I|!| Japan 2017 2018-05-10T10:00:40+00:00
192730230775 297 181 INC 500186 4 New Reason Added 505074 3019683 I|!| Japan 2017 2018-05-10T10:08:01+00:00
192730230775 297 181 INC 500186 1 UpdateReason2UpdateIsNowUPdated 505074 3019680 I|!| Japan 2017 2018-05-10T10:08:01+00:00
Далее я хочу отфильтровать на основе ("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId") when FFAction|!|="O|!|" or "D|!|".
А затем я хочу объединить последние из первого фрейма данных и второго фрейма данных дляокончательный вывод.
Так что у меня будет самое последнее для меня |! |основан на
("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "UpdateReason_updateReasonId")
and latest for O|!| based on ("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId").
В этом случае мой окончательный вывод будет выглядеть как
uniqueFundamentalSet PeriodId SourceId StatementTypeCode StatementCurrencyId UpdateReason_updateReasonId UpdateReasonComment UpdateReasonComment_languageId UpdateReasonEnumerationId FFAction|!| DataPartition PartitionYear
192730230775 297 181 INC 500186 4 New Reason Added 505074 3019683 I|!| Japan 2017
192730230775 297 182 INC 500186 null null null null O|!| Japan 2017
192730230775 297 180 INC 500186 6 InsertUpdateReason 505074 3019685 I|!| Japan 2017
192730230775 297 181 INC 500186 1 UpdateReason2UpdateIsNowUPdated 505074 3019680 I|!| Japan 2017
Вот последний код, который я пытаюсь.
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "UpdateReason_updateReasonId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey1 = tempReorder.withColumn("rank", row_number().over(windowSpec))
.filter($"rank" === 1).drop("rank")
val windowSpec2 = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey2 = latestForEachKey1.withColumn("tobefiltered", row_number().over(windowSpec2))
.filter(($"FFAction|!|" === "I|!|" || $"FFAction|!|" === "O|!|" || ($"FFAction|!|" === "D|!|" && $"FFAction|!|" === "D|!|")) && $"tobefiltered" === 1)
.drop("tobefiltered", "TimeStamp")
Но когдаЯ применяю вышеуказанный код, я пропускаю последнюю запись
192730230775 297 181 INC 500186 1 UpdateReason2UpdateIsNowUPdated 505074 3019680 I|!| Japan 2017