Удаление дубликатов записей с использованием оконной функции в Spark Scala - PullRequest
0 голосов
/ 10 мая 2018

Или просто для простоты понимания, у меня есть фрейм данных.

DataPartition   TimeStamp   OrganizationID  SourceID    AuditorID   AuditorEnumerationId    AuditorOpinionCode  AuditorOpinionId    IsPlayingAuditorRole    IsPlayingCSRAuditorRole IsPlayingTaxAdvisorRole FFAction|!| AuditorOpinionOnInternalControlCode AuditorOpinionOnGoingConcernCode    AuditorOpinionOnInternalControlsId  AuditorOpinionOnGoingConcernId  rank
Japan   2018-05-03T09:52:48+00:00   4295876589  194 null    null    null    null    null    null    null    O|!|    null    null    null    null    1
Japan   2018-05-03T09:52:48+00:00   4295876589  194 2719    3023331 AOP 3010542 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-03T09:52:48+00:00   4295876589  195 16157   1002485247  UWE 3010547 true    false   false   O|!|    null    null    null    null    1
Japan   2018-05-03T07:36:47+00:00   4295876589  196 3252    3024053 ONC 3020538 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-03T07:36:47+00:00   4295876589  195 5937    3026578 NOP 3010543 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-02T10:37:50+00:00   4295876589  156 null    null    null    null    null    null    null    O|!|    null    null    null    null    1
Japan   2018-05-02T10:37:50+00:00   4295876589  157 null    null    null    null    null    null    null    O|!|    null    null    null    null    1
Japan   2018-05-02T10:37:56+00:00   4295876589  193 null    null    null    null    null    null    null    O|!|    null    null    null    null    1
Japan   2018-05-03T08:10:19+00:00   4295876589  196 null    null    null    null    null    null    null    D|!|    null    null    null    null    1
Japan   2018-05-03T09:52:48+00:00   4295876589  195 null    null    null    null    null    null    null    O|!|    null    null    null    null    1

Теперь мне нужно выбрать столбцы, у которых есть Rank = 1 и AuditorID! = Null, но AuditorID =! = Null будетприменимо только для FFAction |! | = "O".

В этом случае мой кадр выходных данных должен выглядеть примерно так:

DataPartition   TimeStamp   OrganizationID  SourceID    AuditorID   AuditorEnumerationId    AuditorOpinionCode  AuditorOpinionId    IsPlayingAuditorRole    IsPlayingCSRAuditorRole IsPlayingTaxAdvisorRole FFAction|!| AuditorOpinionOnInternalControlCode AuditorOpinionOnGoingConcernCode    AuditorOpinionOnInternalControlsId  AuditorOpinionOnGoingConcernId  rank

Japan   2018-05-03T09:52:48+00:00   4295876589  194 2719    3023331 AOP 3010542 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-03T09:52:48+00:00   4295876589  195 16157   1002485247  UWE 3010547 true    false   false   O|!|    null    null    null    null    1
Japan   2018-05-03T07:36:47+00:00   4295876589  196 3252    3024053 ONC 3020538 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-03T07:36:47+00:00   4295876589  195 5937    3026578 NOP 3010543 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-02T10:37:56+00:00   4295876589  193 null    null    null    null    null    null    null    I|!|    null    null    null    null    1
Japan   2018-05-03T08:10:19+00:00   4295876589  196 null    null    null    null    null    null    null    D|!|    null    null    null    null    1

Вот мой код

import org.apache.spark.sql.expressions._
    val windowSpec = Window.partitionBy("OrganizationID", "SourceID", "AuditorID").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
    val latestForEachKey1 = finaldf.withColumn("rank", row_number().over(windowSpec))
    .filter($"rank" === 1 && $"AuditorID" =!= "null")

Сценарий 2 ...

Вот мой фрейм данных

uniqueFundamentalSet    PeriodId    SourceId    StatementTypeCode   StatementCurrencyId UpdateReason_updateReasonId UpdateReasonComment UpdateReasonComment_languageId  UpdateReasonEnumerationId   FFAction|!| DataPartition   PartitionYear   TimeStamp
192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T09:57:29+00:00
192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:11:15+00:00
192730230775    310 182 INC 500186  null    null    null    null    O|!|    Japan   2018    2018-05-10T08:30:53+00:00

Когда я применяю предложенный код в ответ, я получаю вывод ниже

val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "UpdateReason_updateReasonId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
    val latestForEachKey1 = tempReorder.withColumn("rank", row_number().over(windowSpec))
      .filter($"rank" === 1 && (($"UpdateReason_updateReasonId" =!= "null" && $"FFAction|!|" === "O|!|") || $"FFAction|!|" =!= "O|!|")).drop("rank")

192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00

Но я ожидалвывод это.

192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:11:15+00:00

Ответы [ 2 ]

0 голосов
/ 10 мая 2018

вот рабочий код для вас

import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("OrganizationID", "SourceID", "AuditorID").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey1 = finaldf.withColumn("rank", row_number().over(windowSpec))
  .filter($"rank" === 1 && (($"AuditorID" =!= "null" && $"FFAction|!|" === "O|!|") || $"FFAction|!|" =!= "O|!|"))

что должно дать вам

+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+----+
|DataPartition|TimeStamp                |OrganizationID|SourceID|AuditorID|AuditorEnumerationId|AuditorOpinionCode|AuditorOpinionId|IsPlayingAuditorRole|IsPlayingCSRAuditorRole|IsPlayingTaxAdvisorRole|FFAction|!||AuditorOpinionOnInternalControlCode|AuditorOpinionOnGoingConcernCode|AuditorOpinionOnInternalControlsId|AuditorOpinionOnGoingConcernId|rank|
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+----+
|Japan        |2018-05-03T09:52:48+00:00|4295876589    |194     |2719     |3023331             |AOP               |3010542         |true                |false                  |true                   |O|!|       |null                               |null                            |null                              |null                          |1   |
|Japan        |2018-05-03T09:52:48+00:00|4295876589    |195     |16157    |1002485247          |UWE               |3010547         |true                |false                  |false                  |O|!|       |null                               |null                            |null                              |null                          |1   |
|Japan        |2018-05-03T07:36:47+00:00|4295876589    |196     |3252     |3024053             |ONC               |3020538         |true                |false                  |true                   |O|!|       |null                               |null                            |null                              |null                          |1   |
|Japan        |2018-05-03T07:36:47+00:00|4295876589    |195     |5937     |3026578             |NOP               |3010543         |true                |false                  |true                   |O|!|       |null                               |null                            |null                              |null                          |1   |
|Japan        |2018-05-03T08:10:19+00:00|4295876589    |196     |null     |null                |null              |null            |null                |null                   |null                   |D|!|       |null                               |null                            |null                              |null                          |1   |
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+----+

Примечание: запись с sourceID 193 имеет o |! | и ноль, поэтому он не должен быть в выводе

0 голосов
/ 10 мая 2018

Вы можете использовать rownum udf для удаления дубликатов, отметив rownum = 1, а authorid не равен нулю

...