Как передать столбцы в качестве значения в UDF в Spark Scala для проверки состояния - PullRequest
0 голосов
/ 14 мая 2018

Вот мой фрейм данных

uniqueFundamentalSet    PeriodId    SourceId    StatementTypeCode   StatementCurrencyId UpdateReason_updateReasonId UpdateReasonComment UpdateReasonComment_languageId  UpdateReasonEnumerationId   FFAction|!| DataPartition   PartitionYear   TimeStamp
192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:11:15+00:00
192730230775    297 180 INC 500186  6   InsertUpdateReason  505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 181 INC 500186  1   UpdateReason2Update 505074  3019680 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 181 INC 500186  1   UpdateReason2UpdateIsNowUPdated 505074  3019680 I|!|    Japan   2017    2018-05-10T10:08:01+00:00
192730230775    297 181 INC 500186  4   New Reason Added    505074  3019683 I|!|    Japan   2017    2018-05-10T10:08:01+00:00
192730230775    297 180 INC 500186  6   InsertUpdateReason  505074  3019685 I|!|    Japan   2017    2018-05-10T09:57:29+00:00
192730230775    297 181 INC 500186  1   UpdateReason2Update 505074  3019680 I|!|    Japan   2017    2018-05-10T09:57:29+00:00
192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T09:57:29+00:00
192730230775    308 180 BAL 500186  1   RevisedReasonAdded  505074  3019680 O|!|    Japan   2017    2018-05-10T10:21:50+00:00
192730230775    308 180 BAL 500186  6   UpdateReasonToUpdateRevisedisNowUpdated 505074  3019685 O|!|    Japan   2017    2018-05-10T10:21:50+00:00
192730230775    308 180 BAL 500186  1   RevisedReasonAdded  505074  3019680 O|!|    Japan   2017    2018-05-10T10:27:09+00:00
192730230775    308 180 BAL 500186  6   UpdateReasonToUpdateRevisedisNowUpdated 505074  3019685 O|!|    Japan   2017    2018-05-10T10:27:09+00:00
192730230775    308 179 BAL 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T09:27:11+00:00
192730230775    308 181 BAL 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:27:09+00:00
192730230775    308 180 BAL 500186  1   RevisedReasonAdded  505074  3019680 O|!|    Japan   2017    2018-05-10T10:22:55+00:00
192730230775    308 180 BAL 500186  6   UpdateReasonToUpdateRevisedisNowUpdated 505074  3019685 O|!|    Japan   2017    2018-05-10T10:22:55+00:00
192730230775    308 180 BAL 500186  6   UpdateReasonToUpdateRevised 505074  3019685 I|!|    Japan   2017    2018-05-10T10:17:37+00:00
192730230775    308 181 BAL 500186  6   ReasonToDeleteRevised   505074  3019685 I|!|    Japan   2017    2018-05-10T10:17:37+00:00
192730230775    298 180 BAL 500186  6   UpdateReasonToUpdateRevised 505074  3019685 I|!|    Japan   2017    2018-05-10T10:17:37+00:00
192730230775    298 181 BAL 500186  6   ReasonToDeleteRevised   505074  3019685 I|!|    Japan   2017    2018-05-10T10:17:37+00:00
192730230775    298 180 BAL 500186  1   RevisedReasonAdded  505074  3019680 I|!|    Japan   2017    2018-05-10T10:22:55+00:00
192730230775    298 180 BAL 500186  6   UpdateReasonToUpdateRevisedisNowUpdated 505074  3019685 I|!|    Japan   2017    2018-05-10T10:22:55+00:00
192730230775    298 180 BAL 500186  6   UpdateReasonToUpdateRevised 505074  3019685 I|!|    Japan   2017    2018-05-10T10:16:31+00:00
192730230775    298 181 BAL 500186  6   ReasonToDeleteRevised   505074  3019685 I|!|    Japan   2017    2018-05-10T10:16:31+00:00
192730230775    298 180 BAL 500186  1   RevisedReasonAdded  505074  3019680 I|!|    Japan   2017    2018-05-10T10:21:50+00:00
192730230775    298 180 BAL 500186  6   UpdateReasonToUpdateRevisedisNowUpdated 505074  3019685 I|!|    Japan   2017    2018-05-10T10:21:50+00:00
192730230775    312 181 BAL 500186  null    null    null    null    O|!|    Japan   2018    2018-05-10T09:39:43+00:00
192730230775    310 181 INC 500186  null    null    null    null    D|!|    Japan   9999    2018-05-10T08:21:26+00:00
192730230775    310 182 INC 500186  null    null    null    null    O|!|    Japan   2018    2018-05-10T08:30:53+00:00
192730230775    298 181 BAL 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:22:55+00:00

Вот логика для получения ожидаемого результата

Если "FFAction |! |"=== "Я |! |"затем сгруппируйте по первым 6 столбцам, и необходимо получить последние данные на основе отметки времени.

If If "FFAction |! |"=== "O |! |"и $ "UpdateReason_updateReasonId" === "null" или "FFAction |! |"=== "D |! |"затем сгруппируйте по первым 5 столбцам, и необходимо получить последние данные на основе отметки времени.

Если в одной строке "FFAction |! |"=== "Я |! |"и еще один "FFAction |! |"=== "O |! |"в этом случае сгруппируйте по первым пяти столбцам и получите последний.

То же, что и в случае одной строки "FFAction |! |"=== "Я |! |"и еще один "FFAction |! |"=== "D |! |"в этом случае сгруппируйте по первым пяти столбцам и нужно получить последний.

Вот мой ожидаемый результат с объясненной логикой.

Logic Example 1:

Давайте возьмем пример PeridoId 308, который он имеет11 рядов.Теперь одна строка имеет PeriodId 308 и SourceId 179, и он полностью отличается, поэтому он будет выводиться.308 и 181 имеют две строки, идентичные до столбцов 5, и из этого есть O, поэтому нам нужно сгруппировать по 5 столбцам и принять последний и последний значения должны быть последними. 308 и 180 имеют 7 столбцов, похожих до строки 5, и он не имеет UpdateReason_updateReasonId какnull в этом случае group by должен быть в 6 столбцах.

И таким образом самый последний будет

192730230775    308 179 BAL 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T09:27:11+00:00
192730230775    308 181 BAL 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:27:09+00:00
192730230775    308 180 BAL 500186  6   UpdateReasonToUpdateRevisedisNowUpdated 505074  3019685 O|!|    Japan   2017    2018-05-10T10:22:55+00:00
192730230775    308 180 BAL 500186  1   RevisedReasonAdded  505074  3019680 O|!|    Japan   2017    2018-05-10T10:27:09+00:00

Так что это должен быть окончательный результат для PeriodId 308.

Logic Example 2 :

Similary PeriodId 297 имеет 9 столбцов.

Теперь у него есть три комбинации PeridoId 297 с SourceId 180 181 182. Таким образом, будет три строки. В этом 297 и 181 имеют аналогичные 5 столбцов, а SourceId не является нулевым, поэтому группадолжно быть на 6 столбцов.и поэтому у нас будет две уникальные записи, основанные на последней отметке времени.Точно так же 297 и 180 не имеют нулевого значения SourceId, поэтому сгруппируйте по 6 столбцам и, самое позднее, по отметке времени.

и, аналогично, 297 182 имеет три одинаковых строки, но имеет нулевой SourceId, поэтому для группировки по 5 столбцам необходимочтобы получить последнюю версию.

Итак, вот окончательный вывод для 297

192730230775    297 181 INC 500186  1   UpdateReason2Update 505074  3019680 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 180 INC 500186  6   InsertUpdateReason  505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 181 INC 500186  4   New Reason Added    505074  3019683 I|!|    Japan   2017    2018-05-10T10:08:01+00:00
192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:11:15+00:00

Вот мой код, который делает то же самое, кроме последней логики

import org.apache.spark.sql.expressions._ import org.apache.spark.sql.functions ._

val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId")

val windowSpec2 = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "group").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)


def containsActionUdf = udf {

  (array: Seq[String]) => (array.contains("O|!|") || array.contains("D|!|"))
}

val latestForEachKey2 = tempReorder.withColumn("group", when(containsActionUdf(collect_list("FFAction|!|").over(windowSpec)) && ($"UpdateReason_updateReasonId" === "null") , lit("same")).otherwise($"UpdateReason_updateReasonId"))
  .withColumn("rank", row_number().over(windowSpec2))
  .filter($"rank" === 1).drop("rank", "group")

Вот вывод, который я получаю с одной дополнительной строкой.

        +--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+-------------------------+
|uniqueFundamentalSet|PeriodId|SourceId|StatementTypeCode|StatementCurrencyId|UpdateReason_updateReasonId|UpdateReasonComment                    |UpdateReasonComment_languageId|UpdateReasonEnumerationId|FFAction|!||DataPartition|PartitionYear|TimeStamp                |
+--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+-------------------------+
|192730230775        |297     |181     |INC              |500186             |1                          |UpdateReason2UpdateIsNowUPdated        |505074                        |3019680                  |I|!|       |Japan        |2017         |2018-05-10T10:08:01+00:00|
|192730230775        |297     |181     |INC              |500186             |4                          |New Reason Added                       |505074                        |3019683                  |I|!|       |Japan        |2017         |2018-05-10T10:08:01+00:00|
|192730230775        |308     |179     |BAL              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T09:27:11+00:00|
|192730230775        |298     |181     |BAL              |500186             |6                          |ReasonToDeleteRevised                  |505074                        |3019685                  |I|!|       |Japan        |2017         |2018-05-10T10:17:37+00:00|
|192730230775        |298     |181     |BAL              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T10:22:55+00:00|
|192730230775        |297     |182     |INC              |500186             |6                          |UpdateReasonToDelete                   |505074                        |3019685                  |I|!|       |Japan        |2017         |2018-05-10T10:00:40+00:00|
|192730230775        |297     |182     |INC              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T10:11:15+00:00|
|192730230775        |308     |180     |BAL              |500186             |1                          |RevisedReasonAdded                     |505074                        |3019680                  |O|!|       |Japan        |2017         |2018-05-10T10:27:09+00:00|
|192730230775        |308     |180     |BAL              |500186             |6                          |UpdateReasonToUpdateRevisedisNowUpdated|505074                        |3019685                  |O|!|       |Japan        |2017         |2018-05-10T10:27:09+00:00|
|192730230775        |310     |181     |INC              |500186             |null                       |null                                   |null                          |null                     |D|!|       |Japan        |9999         |2018-05-10T08:21:26+00:00|
|192730230775        |308     |181     |BAL              |500186             |6                          |ReasonToDeleteRevised                  |505074                        |3019685                  |I|!|       |Japan        |2017         |2018-05-10T10:17:37+00:00|
|192730230775        |308     |181     |BAL              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T10:27:09+00:00|
|192730230775        |298     |180     |BAL              |500186             |1                          |RevisedReasonAdded                     |505074                        |3019680                  |I|!|       |Japan        |2017         |2018-05-10T10:22:55+00:00|
|192730230775        |298     |180     |BAL              |500186             |6                          |UpdateReasonToUpdateRevisedisNowUpdated|505074                        |3019685                  |I|!|       |Japan        |2017         |2018-05-10T10:22:55+00:00|
|192730230775        |312     |181     |BAL              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2018         |2018-05-10T09:39:43+00:00|
|192730230775        |310     |182     |INC              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2018         |2018-05-10T08:30:53+00:00|
|192730230775        |297     |180     |INC              |500186             |6                          |InsertUpdateReason                     |505074                        |3019685                  |I|!|       |Japan        |2017         |2018-05-10T10:00:40+00:00|
+--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+-------------------------+

Например, конечный результат должен быть .. Окончательный результат ..

192730230775    297 181 INC 500186  1   UpdateReason2Update 505074  3019680 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 180 INC 500186  6   InsertUpdateReason  505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 181 INC 500186  4   New Reason Added    505074  3019683 I|!|    Japan   2017    2018-05-10T10:08:01+00:00
192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:11:15+00:00
192730230775    308 179 BAL 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T09:27:11+00:00
192730230775    308 181 BAL 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:27:09+00:00
192730230775    308 180 BAL 500186  6   UpdateReasonToUpdateRevisedisNowUpdated 505074  3019685 O|!|    Japan   2017    2018-05-10T10:22:55+00:00
192730230775    308 180 BAL 500186  1   RevisedReasonAdded  505074  3019680 O|!|    Japan   2017    2018-05-10T10:27:09+00:00
192730230775    298 180 BAL 500186  6   UpdateReasonToUpdateRevised 505074  3019685 I|!|    Japan   2017    2018-05-10T10:16:31+00:00
192730230775    298 180 BAL 500186  1   RevisedReasonAdded  505074  3019680 I|!|    Japan   2017    2018-05-10T10:22:55+00:00
192730230775    298 181 BAL 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:22:55+00:00
192730230775    312 181 BAL 500186  null    null    null    null    O|!|    Japan   2018    2018-05-10T09:39:43+00:00
192730230775    310 181 INC 500186  null    null    null    null    D|!|    Japan   9999    2018-05-10T08:21:26+00:00
192730230775    310 182 INC 500186  null    null    null    null    O|!|    Japan   2018    2018-05-10T08:30:53+00:00

1 Ответ

0 голосов
/ 15 мая 2018

После понимания вашей логики, кажется, что вы проверяли неправильные столбцы в функции udf.Вы должны проверять UpdateReason_updateReasonId на пустые значения следующим образом

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._

//window for checking if O|!| is present in the group
val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId")
//window for filtering out the latest after applying the group defined in previous window
val windowSpec2 = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "group").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)

//udf to check if the group has O|!| or not
def containsUdf = udf{(array: Seq[String])=> array.contains("null") || array.contains("NULL") || array.contains(null)}

//applying the window and udf functions and filtering in the latest
val latestForEachKey1 = tempReorder.withColumn("group", when(containsUdf(collect_list("UpdateReason_updateReasonId").over(windowSpec)), lit("same")).otherwise($"UpdateReason_updateReasonId"))
                                    .withColumn("rank", row_number().over(windowSpec2))
                                    .filter($"rank" === 1).drop("rank", "group")
latestForEachKey1.show(false)

, что должно дать вам

+--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+--------------------------+
|uniqueFundamentalSet|PeriodId|SourceId|StatementTypeCode|StatementCurrencyId|UpdateReason_updateReasonId|UpdateReasonComment                    |UpdateReasonComment_languageId|UpdateReasonEnumerationId|FFAction|!||DataPartition|PartitionYear|TimeStamp                 |
+--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+--------------------------+
|192730230775        |297     |181     |INC              |500186             |1                          |UpdateReason2UpdateIsNowUPdated        |505074                        |3019680                  |I|!|       |Japan        |2017         |2018-05-10T10:08:01+00:00 |
|192730230775        |297     |181     |INC              |500186             |4                          |New Reason Added                       |505074                        |3019683                  |I|!|       |Japan        |2017         |2018-05-10T10:08:01+00:00 |
|192730230775        |308     |179     |BAL              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T09:27:11+00:00 |
|192730230775        |298     |181     |BAL              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T10:22:55+00:00 |
|192730230775        |297     |182     |INC              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T10:11:15+00:00 |
|192730230775        |308     |180     |BAL              |500186             |1                          |RevisedReasonAdded                     |505074                        |3019680                  |O|!|       |Japan        |2017         |2018-05-10T10:27:09+00:00 |
|192730230775        |308     |180     |BAL              |500186             |6                          |UpdateReasonToUpdateRevisedisNowUpdated|505074                        |3019685                  |O|!|       |Japan        |2017         |2018-05-10T10:27:09+00:000|
|192730230775        |310     |181     |INC              |500186             |null                       |null                                   |null                          |null                     |D|!|       |Japan        |9999         |2018-05-10T08:21:26+00:00 |
|192730230775        |308     |181     |BAL              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2017         |2018-05-10T10:27:09+00:00 |
|192730230775        |298     |180     |BAL              |500186             |1                          |RevisedReasonAdded                     |505074                        |3019680                  |I|!|       |Japan        |2017         |2018-05-10T10:22:55+00:00 |
|192730230775        |298     |180     |BAL              |500186             |6                          |UpdateReasonToUpdateRevisedisNowUpdated|505074                        |3019685                  |I|!|       |Japan        |2017         |2018-05-10T10:21:50+00:000|
|192730230775        |312     |181     |BAL              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2018         |2018-05-10T09:39:43+00:00 |
|192730230775        |310     |182     |INC              |500186             |null                       |null                                   |null                          |null                     |O|!|       |Japan        |2018         |2018-05-10T08:30:53+00:00 |
|192730230775        |297     |180     |INC              |500186             |6                          |InsertUpdateReason                     |505074                        |3019685                  |I|!|       |Japan        |2017         |2018-05-10T10:00:40+00:00 |
+--------------------+--------+--------+-----------------+-------------------+---------------------------+---------------------------------------+------------------------------+-------------------------+-----------+-------------+-------------+--------------------------+

Я предполагаю, что это ожидаемый результат.Я надеюсь, что ответ полезен

...