GroupBy с условием на агрегат Spark / Scala - PullRequest
0 голосов
/ 02 октября 2019

У меня есть такой фрейм данных:

|   ID_VISITE_CALCULE|       TAG_TS_TO_TS|EXTERNAL_PERSON_ID|EXTERNAL_ORGANISATION_ID| RK|
+--------------------+-------------------+------------------+------------------------+---+
|GA1.2.1023040287....|2019-04-23 11:24:19|            dupont|                    null|  1|
|GA1.2.1023040287....|2019-04-23 11:24:19|            durand|                    null|  2|
|GA1.2.105243141.1...|2019-04-23 11:21:01|              null|                    null|  1|
|GA1.2.1061963529....|2019-04-23 11:12:19|              null|                    null|  1|
|GA1.2.1065635192....|2019-04-23 11:07:14|            antoni|                    null|  1|
|GA1.2.1074357108....|2019-04-23 11:11:34|              lang|                    null|  1|
|GA1.2.1074357108....|2019-04-23 11:12:37|              lang|                    null|  2|
|GA1.2.1075803022....|2019-04-23 11:28:38|            cavail|                    null|  1|
|GA1.2.1080137035....|2019-04-23 11:20:00|              null|                    null|  1|
|GA1.2.1081805479....|2019-04-23 11:10:49|              null|                    null|  1|
|GA1.2.1081805479....|2019-04-23 11:10:49|            linare|                    null|  2|
|GA1.2.1111218536....|2019-04-23 11:28:43|              null|                    null|  1|
|GA1.2.1111218536....|2019-04-23 11:32:26|              null|                    null|  2|
|GA1.2.1111570355....|2019-04-23 11:07:00|              null|                    null|  1|
+--------------------+-------------------+------------------+------------------------+---+

Я пытаюсь применить правила для агрегирования по ID_VISITE_CALCULE и сохранить только одну строку для идентификатора.

Для идентификатора (группы) я бы хотел:

  • получить первую временную метку группы и сохранить ее в столбце START

  • получить последнюю временную метку группы и сохранить ее в столбце END

  • проверить, является ли EXTERNAL_PERSON_ID одинаковым для всей группы. Если это так, и это NULL, то я пишу NULL, если это так, и это имя, то я пишу имя. Наконец, если в группе есть разные значения, я регистрирую UNDEFINED

  • и применяю точно такие же правила к столбцу EXTERNAL_ORGANIZATION_ID
RESULT :
+--------------------+------------------+------------------------+-------------------+-------------------+
|   ID_VISITE_CALCULE|EXTERNAL_PERSON_ID|EXTERNAL_ORGANISATION_ID|              START|                END|
+--------------------+------------------+------------------------+-------------------+-------------------+
|GA1.2.1023040287....|         undefined|                    null|2019-04-23 11:24:19|2019-04-23 11:24:19|
|GA1.2.105243141.1...|              null|                    null|2019-04-23 11:21:01|2019-04-23 11:21:01|
|GA1.2.1061963529....|              null|                    null|2019-04-23 11:12:19|2019-04-23 11:12:19|
|GA1.2.1065635192....|            antoni|                    null|2019-04-23 11:07:14|2019-04-23 11:07:14|
|GA1.2.1074357108....|              lang|                    null|2019-04-23 11:11:34|2019-04-23 11:12:37|
|GA1.2.1075803022....|            cavail|                    null|2019-04-23 11:28:38|2019-04-23 11:28:38|
|GA1.2.1080137035....|              null|                    null|2019-04-23 11:20:00|2019-04-23 11:20:00|
|GA1.2.1081805479....|         undefined|                    null|2019-04-23 11:10:49|2019-04-23 11:10:49|
|GA1.2.1111218536....|              null|                    null|2019-04-23 11:28:43|2019-04-23 11:32:26|
|GA1.2.1111570355....|              null|                    null|2019-04-23 11:07:00|2019-04-23 11:07:00|
+--------------------+------------------+------------------------+-------------------+-------------------+

В моем примере у меня есть толькоМаксимум 2 строки для группы, но в реальном наборе данных у меня может быть несколько сотен строк в группе.

Спасибо за вашу любезную помощь.

Ответы [ 2 ]

0 голосов
/ 03 октября 2019

Все можно выполнить одним групповым вызовом, однако я бы посоветовал для (незначительного) выигрыша в производительности и для удобства чтения кода разделиться на 2 вызова:

import org.apache.spark.sql.functions.{col, size, collect_set, max, min, when, lit}

val res1DF = df.groupBy(col("ID_VISITE_CALCULE")).agg(
 min(col("START")).alias("START"),
 max(col("END")).alias("END"),
 collect_set(col("EXTERNAL_PERSON_ID")).alias("EXTERNAL_PERSON_ID"),
 collect_set(col("EXTERNAL_ORGANIZATION_ID")).alias("EXTERNAL_ORGANIZATION_ID")
)

val res2DF = res1DF.withColumn("EXTERNAL_PERSON_ID",
 when(
  size(col("EXTERNAL_PERSON_ID")) > 1, 
  lit("UNDEFINED")).otherwise(col("EXTERNAL_PERSON_ID").getItem(0)
 )
).withColumn("EXTERNAL_ORGANIZATION_ID",
 when(
  size(col("EXTERNAL_ORGANIZATION_ID")) > 1, 
  lit("UNDEFINED")).otherwise(col("EXTERNAL_ORGANIZATION_ID").getItem(0)
 )
)

Метод getItemвыполняет большинство условий в фоновом режиме. Если набор значений пуст, он вернет null, а если имеется только одно значение, он вернет значение.

0 голосов
/ 03 октября 2019

/ Было бы хорошо, если вы покажете какой-нибудь код / ​​Пример данных, из которого построен фрейм данных.

Предполагая, что у вас есть фрейм данных как tableDf

** Spark Sql Solution **

tableDf.createOrReplaceTempView("input_table")
val sqlStr ="""
    select ID_VISITE_CALCULE,
           (case when count(distinct person_id_calculation) > 1 then "undefined"
                when count(distinct person_id_calculation) = 1 and 
                     max(person_id_calculation) = "noNull" then ""
                else max(person_id_calculation)) as EXTERNAL_PERSON_ID,
         -- do the same for EXTERNAL_ORGANISATION_ID
        max(start_v) as start_v, max(last_v) as last_v
from
(select ID_VISITE_CALCULE,
           ( case
               when nvl(EXTERNAL_PERSON_ID,"noNull") = 
                    lag(EXTERNAL_PERSON_ID,1,"noNull")over(partition by 
                        ID_VISITE_CALCULE order by TAG_TS_TO_TS) then 
                        EXTERNAL_PERSON_ID
               else "undefined" end ) AS person_id_calculation,
            -- Same calculation for EXTERNAL_ORGANISATION_ID
            first(TAG_TS_TO_TS) over(partition by ID_VISITE_CALCULE order by 
                              TAG_TS_TO_TS) as START_V,
           last(TAG_TS_TO_TS) over(partition by ID_VISITE_CALCULE order by 
                              TAG_TS_TO_TS) as last_V 
           from input_table ) a
   group by 1
"""
val resultDf = spark.sql(sqlStr)
...