Как выбрать «лучшее соответствие» в Apache Spark при агрегировании в DataFrame? - PullRequest
0 голосов
/ 08 января 2020

Предположим, у меня есть эти данные в кадре данных Spark:

Person   Position
-----------------
Tom      Gold
Tom      Silver
Tom      Silver
Dick     Silver
Dick     Silver
Dick     null
Harry    Gold
Harry    Silver
Alice    Bronze
Alice    null
Bob      null
Bob      null

Это то, чего я хочу достичь в своем выводе:

Person   BestPosition
-----------------
Tom      Gold
Dick     Silver
Harry    Gold
Alice    Bronze
Bob      null

Это не реальный сценарий, но это довольно хорошее приближение. Порядок ранжирования позиций может быть жестко заданным или заданным в конфигурации, я не возражаю (в реальном сценарии их всего 3 или 4, и они не изменятся).

Если бы я делал это в C# Я думаю, это выглядело бы примерно так:

var data = new List<int>{}.Select(x => new { Name = "Tom",  Position = "Gold" },  ... etc);
var aggregation = data
    .GroupBy(scores => scores.Name)
    .Select(grouping => new {
        Name = grouping.Key,
        BestPosition = new [] {"Gold", "Silver", "Bronze"}.FirstOrDefault(x => grouping.Any(score => score.Position == x))
    })
    .ToList();

Я пишу свое приложение в Spark для. NET (что в ретроспективе было плохим решением, которое я не могу позволить себе изменить направление сейчас). Я ценю, что никто другой не использует Spark для. NET, но здесь я борюсь с концепцией, а не с кодом, поэтому, если кто-то сможет решить эту проблему в Scala / Java / Python, я почти уверен, что смогу конвертировать.

Я довольно новичок в Spark, поэтому решение может быть очевидным, но я не уверен, как go об этом.

Ответы [ 2 ]

1 голос
/ 08 января 2020

Другое решение с использованием collect_set и array_contains с groupBy:

df.groupBy($"Person").agg(collect_set(col("Position")).alias("Position")) 
  .withColumn("Position", when(array_contains($"Position", "Gold"), "Gold") 
                          .when(array_contains($"Position", "Silver"), "Silver") 
                          .when(array_contains($"Position", "Bronze"), "Bronze")
             ) 
  .show()

Дает:

+------+--------+
|Person|Position|
+------+--------+
|   Tom|    Gold|
|  Dick|  Silver|
|   Bob|    null|
| Alice|  Bronze|
| Harry|    Gold|
+------+--------+
1 голос
/ 08 января 2020

PySpark sql решение. Список приоритетов можно установить в предложении order by оконной функции, которое затем можно использовать для выбора лучшей строки для человека.

from pyspark.sql.functions import row_number,when,col
from pyspark.sql import Window
w = Window.partitionBy(col('Person')).orderBy(when(col('Position') == 'Gold',1)
                                              .when(col('Position') == 'Silver',2)
                                              .when(col('Position') == 'Bronze',3)
                                              .otherwise(4)
                                             )
rnum_df = df.withColumn('rnum',row_number().over(w))
result = rnum_df.filter(col('rnum') == 1).select(df.columns)
result.show()
...