Агрегат PySpark и условия - PullRequest
0 голосов
/ 18 марта 2019

У меня есть вопрос по PySpark.

df = (sc.parallelize([
    {"Day":1,"sensitive_id":"1234", "num":3},
    {"Day":1,"sensitive_id":"1234", "num":3}, 
    {"Day":2,"sensitive_id":"1234", "num":3},
    {"Day":3,"sensitive_id":"2345", "num":2},
    {"Day":3,"sensitive_id":"2345", "num":2},
    {"Day":3,"sensitive_id":"6789", "num":4},
    {"Day":4,"sensitive_id":"6789", "num":4},
    {"Day":4,"sensitive_id":"6789", "num":4},
    {"Day":4,"sensitive_id":"6789", "num":4}
 ]).toDF()
      )

enter image description here

Я хочу, чтобы новый столбец имел соответствующий «Sens__ID» в качествемаксимальное значение столбца "num".

Это результат, который я имею до сих пор.

from pyspark.sql import functions as F
from pyspark.sql.window import Window
(
    df 
    .withColumn(
        'top_sensitive_id',
        F.when
        (
            F.col('num') == F.max(F.col('num')).over(Window.partitionBy(F.col('Day'))),
            F.col('sensitive_id')
        )
    )
    .withColumn
    (
        'top',
        F.max(F.col('top_sensitive_id')).over(Window.partitionBy(F.col('Day')))
    )

    .show()
)

enter image description here

Но яконечно, должен быть более изящный и эффективный способ сделать это.

Может кто-нибудь предложить лучший способ сделать это?

1 Ответ

1 голос
/ 25 марта 2019

Ваш код почти близок к лучшему подходу, но я все же попытался добавить несколько вещей -

1.Рассчитать 'top' один раз и использовать его для сравнения.

2. Использовать отдельноопределение столбца, это поможет улучшить читаемость и удобство обслуживания

 from pyspark.sql.window import Window

windowSpec = Window.partitionBy('Day')
top = max('sensitive_id').over(windowSpec).alias('top')

df.select('Day','Num','sensitive_id',top).withColumn('top_sensitive_id', expr("CASE WHEN sensitive_id = top THEN top END")).show()
...