pyspark фильтрует фрейм данных, используя минимальное значение для каждого идентификатора - PullRequest
1 голос
/ 24 марта 2020

Имеется таблица, подобная следующей:

+--+------------------+-----------+
|id|     diagnosis_age|  diagnosis|
+--+------------------+-----------+
| 1|2.1843037179180302| 315.320000|
| 1|  2.80033330216659| 315.320000|
| 1|   2.8222365762732| 315.320000|
| 1|  5.64822705794013| 325.320000|
| 1| 5.686557787521759| 335.320000|
| 2|  5.70572315231258| 315.320000|
| 2| 5.724888517103389| 315.320000|
| 3| 5.744053881894209| 315.320000|
| 3|5.7604813374292005| 315.320000|
| 3|  5.77993740687426| 315.320000|
+--+------------------+-----------+

Я пытаюсь уменьшить количество записей по идентификатору, рассматривая только диагнозы с наименьшим возрастом диагноза для идентификатора. В SQL вы бы присоединили таблицу к себе, что-то вроде:

SELECT a.id, a.diagnosis_age, a.diagnosis
    FROM tbl1 a
INNER JOIN
(SELECT id, MIN(diagnosis_age) AS min_diagnosis_age
    FROM tbl1
        GROUP BY id) b
ON b.id = a.id
WHERE b.min_diagnosis_age = a.diagnosis_age

Если бы это был rdd, вы могли бы сделать что-то вроде:

rdd.map(lambda x: (x["id"], [(x["diagnosis_age"], x["diagnosis"])]))\
.reduceByKey(lambda x, y: x + y)\
.map(lambda x: (x[0], [i for i in x[1] if i[0] == min(x[1])[0]]))

Как бы вы достигли того же используя только операции искровых данных? Если это возможно? В частности, нет операций sql / rdd.

спасибо

1 Ответ

2 голосов
/ 24 марта 2020

Вы можете использовать window с функцией first, а затем filter из всех остальных.

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("id").orderBy("diagnosis_age")
df.withColumn("least_age", F.first("diagnosis_age").over(w))\
.filter("diagnosis_age=least_age").drop("least_age").show()

+---+------------------+---------+
| id|     diagnosis_age|diagnosis|
+---+------------------+---------+
|  1|2.1843037179180302|   315.32|
|  3| 5.744053881894209|   315.32|
|  2|  5.70572315231258|   315.32|
+---+------------------+---------+

Вы также можете сделать это без оконной функции, используйте groupBy min и first:

from pyspark.sql import functions as F
df.orderBy("diagnosis_age").groupBy("id")\
.agg(F.min("diagnosis_age").alias("diagnosis_age"), F.first("diagnosis").alias("diagnosis"))\
.show()
+---+------------------+---------+
| id|     diagnosis_age|diagnosis|
+---+------------------+---------+
|  1|2.1843037179180302|   315.32|
|  3| 5.744053881894209|   315.32|
|  2|  5.70572315231258|   315.32|
+---+------------------+---------+

Примечание , который я заказываю diagnosis_age до groupyBy, чтобы обработать те случаи, когда требуемое диагностическое значение не отображается в первом ряду группы . Однако , если ваши данные уже упорядочены по diagnosis_age, вы можете использовать вышеуказанный код без orderBy.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...