PySpark - выбрать отдельные строки на основе максимального значения из другого столбца - PullRequest
0 голосов
/ 14 февраля 2020

Я пишу свою первую искровую работу и застрял в ситуации, которую не знаю, как решить с помощью Spark. У меня есть список, подобный этому:

mylist = [
  {"id":1,"name":"sally", "role": "manager", "updated_at": "2020-01-13T18:42:21Z"},
  {"id":1,"name":"sally", "role": "director", "updated_at": "2020-01-14T10:17:45Z"},
  {"id":2,"name":"eric", "role": "leader", "updated_at": "2020-01-28T07:33:58Z"},
  {"id":3,"name":"john", "role": "leader", "updated_at": "2020-01-01T20:00:00Z"},
  {"id":2,"name":"eric", "role": "developer", "updated_at": "2020-01-10T08:28:31Z"},
]

df = spark.createDataFrame(Row(**x) for x in mylist).show(truncate=False)

df.show(n=10)

Мне интересно, можно ли отфильтровать этот фрейм данных и получить отдельные строки (уникальные идентификаторы) на основе max updated_at. Максимальное значение updated_at представляет последний статус каждого сотрудника.

Ожидаемый результат из приведенного выше примера:

  {"id":1,"name":"sally", "role": "director", "updated_at": "2020-01-14T10:17:45Z"},
  {"id":2,"name":"eric", "role": "leader", "updated_at": "2020-01-28T07:33:58Z"},
  {"id":3,"name":"john", "role": "leader", "updated_at": "2020-01-01T20:00:00Z"},

Если это возможно? На какую функцию мне смотреть?

1 Ответ

4 голосов
/ 14 февраля 2020

Попробуйте использовать функцию row_number()-window и получите только latest record для уникальных. Затем создайте json output для фрейма данных.

Example:

from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.window import Window

mylist = [
  {"id":1,"name":"sally", "role": "manager", "updated_at": "2020-01-13T18:42:21Z"},
  {"id":1,"name":"sally", "role": "director", "updated_at": "2020-01-14T10:17:45Z"},
  {"id":2,"name":"eric", "role": "leader", "updated_at": "2020-01-28T07:33:58Z"},
  {"id":3,"name":"john", "role": "leader", "updated_at": "2020-01-01T20:00:00Z"},
  {"id":2,"name":"eric", "role": "developer", "updated_at": "2020-01-10T08:28:31Z"},
]

df = spark.createDataFrame(Row(**x) for x in mylist)

w =  Window.partitionBy("id").orderBy(desc("updated_at"))

df1=df.withColumn("rn", row_number().over(w)).filter(col("rn") == 1).drop("rn")

df1.toJSON().collect()

#result
#[u'{"id":1,"name":"sally","role":"director","updated_at":"2020-01-14T10:17:45Z"}', 
#u'{"id":2,"name":"eric","role":"leader","updated_at":"2020-01-28T07:33:58Z"}', 
#u'{"id":3,"name":"john","role":"leader","updated_at":"2020-01-01T20:00:00Z"}']

#write to HDFS..etc filesystems
#only use repartition with less data or else use coalesce(1)

df1.repartition(1).write.mode("overwrite").format("json").save("<path>")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...