Оконная функция с partitionBy для фрейма данных Spark не работает - PullRequest
0 голосов
/ 12 мая 2018

Вот мой фрейм данных 1.Я отфильтровываю последние данные на основе отметки времени на ("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "UpdateReason_updateReasonId")

Здесь я сортирую по 6 столбцам.

uniqueFundamentalSet    PeriodId    SourceId    StatementTypeCode   StatementCurrencyId UpdateReason_updateReasonId UpdateReasonComment UpdateReasonComment_languageId  UpdateReasonEnumerationId   FFAction|!| DataPartition   PartitionYear   TimeStamp
192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:11:15+00:00
192730230775    297 181 INC 500186  1   UpdateReason2UpdateIsNowUPdated 505074  3019680 I|!|    Japan   2017    2018-05-10T10:08:01+00:00
192730230775    297 181 INC 500186  4   New Reason Added    505074  3019683 I|!|    Japan   2017    2018-05-10T10:08:01+00:00
192730230775    297 180 INC 500186  6   InsertUpdateReason  505074  3019685 I|!|    Japan   2017    2018-05-10T09:57:29+00:00
192730230775    297 181 INC 500186  1   UpdateReason2Update 505074  3019680 I|!|    Japan   2017    2018-05-10T09:57:29+00:00
192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T09:57:29+00:00
192730230775    297 180 INC 500186  6   InsertUpdateReason  505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 181 INC 500186  1   UpdateReason2Update 505074  3019680 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00

Вот код для этого

val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "UpdateReason_updateReasonId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey1 = tempReorder.withColumn("rank", row_number().over(windowSpec))
  .filter($"rank" === 1)).drop("rank")

И это дает мне вывод ниже.

uniqueFundamentalSet    PeriodId    SourceId    StatementTypeCode   StatementCurrencyId UpdateReason_updateReasonId UpdateReasonComment UpdateReasonComment_languageId  UpdateReasonEnumerationId   FFAction|!| DataPartition   PartitionYear   TimeStamp
192730230775    297 180 INC 500186  6   InsertUpdateReason  505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:11:15+00:00
192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 181 INC 500186  4   New Reason Added    505074  3019683 I|!|    Japan   2017    2018-05-10T10:08:01+00:00
192730230775    297 181 INC 500186  1   UpdateReason2UpdateIsNowUPdated 505074  3019680 I|!|    Japan   2017    2018-05-10T10:08:01+00:00

Далее я хочу отфильтровать на основе ("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId") when FFAction|!|="O|!|" or "D|!|".

А затем я хочу объединить последние из первого фрейма данных и второго фрейма данных дляокончательный вывод.

Так что у меня будет самое последнее для меня |! |основан на

("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "UpdateReason_updateReasonId") 
and latest for O|!| based on ("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId").

В этом случае мой окончательный вывод будет выглядеть как

uniqueFundamentalSet    PeriodId    SourceId    StatementTypeCode   StatementCurrencyId UpdateReason_updateReasonId UpdateReasonComment UpdateReasonComment_languageId  UpdateReasonEnumerationId   FFAction|!| DataPartition   PartitionYear
192730230775    297 181 INC 500186  4   New Reason Added    505074  3019683 I|!|    Japan   2017
192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017
192730230775    297 180 INC 500186  6   InsertUpdateReason  505074  3019685 I|!|    Japan   2017
192730230775    297 181 INC 500186  1   UpdateReason2UpdateIsNowUPdated 505074  3019680 I|!|    Japan   2017

Вот последний код, который я пытаюсь.

import org.apache.spark.sql.expressions._
    val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "UpdateReason_updateReasonId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
    val latestForEachKey1 = tempReorder.withColumn("rank", row_number().over(windowSpec))
      .filter($"rank" === 1).drop("rank")

    val windowSpec2 = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
    val latestForEachKey2 = latestForEachKey1.withColumn("tobefiltered", row_number().over(windowSpec2))
      .filter(($"FFAction|!|" === "I|!|" || $"FFAction|!|" === "O|!|" || ($"FFAction|!|" === "D|!|" && $"FFAction|!|" === "D|!|")) && $"tobefiltered" === 1)
      .drop("tobefiltered", "TimeStamp")

Но когдаЯ применяю вышеуказанный код, я пропускаю последнюю запись

192730230775    297 181 INC 500186  1   UpdateReason2UpdateIsNowUPdated 505074  3019680 I|!|    Japan   2017 

1 Ответ

0 голосов
/ 12 мая 2018

Вам необходимо переопределить логику , которую вы используете. После выяснения логики, вам нужно определить группы в зависимости от 5 столбцов uniqueFundamentalSet, PeriodId, SourceId, StatementTypeCode, StatementCurrencyId что если O|!| присутствует в столбце FFAction|!| или нет. а затем после определения группы вы можете фильтровать, используя логику номера строки как обычно

Решение прокомментировано для ясности

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._

//window for checking if O|!| is present in the group
val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId")
//window for filtering out the latest after applying the group defined in previous window
val windowSpec2 = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "group").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)

//udf to check if the group has O|!| or not
def containsUdf = udf{(array: Seq[String])=> array.contains("O|!|")}

//applying the window and udf functions and filtering in the latest
val latestForEachKey1 = tempReorder.withColumn("group", when(containsUdf(collect_list("FFAction|!|").over(windowSpec)), lit("same")).otherwise($"UpdateReason_updateReasonId"))
                                    .withColumn("rank", row_number().over(windowSpec2))
                                    .filter($"rank" === 1).drop("rank", "group")

что должно дать вам

+--------------------+--------+--------+-----------------+-------------------+---------------------------+-------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+-------------------------+
|uniqueFundamentalSet|PeriodId|SourceId|StatementTypeCode|StatementCurrencyId|UpdateReason_updateReasonId|UpdateReasonComment            |UpdateReasonComment_languageId|UpdateReasonEnumerationId|FFAction|!||DataPartition|PartitionYear|TimeStamp                |
+--------------------+--------+--------+-----------------+-------------------+---------------------------+-------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+-------------------------+
|192730230775        |297     |181     |INC              |500186             |1                          |UpdateReason2UpdateIsNowUPdated|505074                        |3019680                  |I|!|       |Japan        |2017         |2018-05-10T10:08:01+00:00|
|192730230775        |297     |181     |INC              |500186             |4                          |New Reason Added               |505074                        |3019683                  |I|!|       |Japan        |2017         |2018-05-10T10:08:01+00:00|
|192730230775        |297     |182     |INC              |500186             |null                       |null                           |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T10:11:15+00:00|
|192730230775        |297     |180     |INC              |500186             |6                          |InsertUpdateReason             |505074                        |3019685                  |I|!|       |Japan        |2017         |2018-05-10T10:00:40+00:00|
+--------------------+--------+--------+-----------------+-------------------+---------------------------+-------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+-------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...