Искра DataFrame сокращает несколько записей на один идентификатор до наиболее частого значения - PullRequest
0 голосов
/ 25 марта 2020

Имеется таблица, подобная следующей:

+--+------------------+-----------+
|id|     diagnosis_age|  diagnosis|
+--+------------------+-----------+
| 1|2.1843037179180302| 315.320000|
| 1|  2.80033330216659| 315.320000|
| 1|   2.8222365762732| 315.320000|
| 1|  5.64822705794013| 325.320000|
| 1| 5.686557787521759| 335.320000|
| 2|  5.70572315231258| 315.320000|
| 2| 5.724888517103389| 315.320000|
| 3| 5.744053881894209| 315.320000|
| 3|5.7604813374292005| 315.320000|
| 3|  5.77993740687426| 315.320000|
+--+------------------+-----------+

Я пытаюсь сократить количество записей по идентификатору до одной, установив наиболее частый диагноз для этого идентификатора.

Если это были бы rdd, что-то вроде бы сделало это:

rdd.map(lambda x: (x["id"], [(x["diagnosis_age"], x["diagnosis"])]))\
.reduceByKey(lambda x, y: x + y)\
.map(lambda x: [i[1] for i in x[1]])\
.map(lambda x: [max(zip((x.count(i) for i in set(x)), set(x)))])

в sql:

select id, diagnosis, diagnosis_age
from (select id, diagnosis, diagnosis_age, count(*) as cnt,
             row_number() over (partition by id order by count(*) desc) as seqnum
      from t
      group by id, diagnosis, age
     ) da
where seqnum = 1;

желаемый результат:

+--+------------------+-----------+
|id|     diagnosis_age|  diagnosis|
+--+------------------+-----------+
| 1|2.1843037179180302| 315.320000|
| 2|  5.70572315231258| 315.320000|
| 3| 5.744053881894209| 315.320000|
+--+------------------+-----------+

Как мне достичь То же самое, если возможно, использовать только искровые операции с кадрами? В частности, без использования каких-либо действий RDD / sql.

Спасибо

Ответы [ 2 ]

1 голос
/ 25 марта 2020

Вы можете использовать count, max, first с оконными функциями и фильтровать по count=max.

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("id","diagnosis").orderBy("diagnosis_age")
w2=Window().partitionBy("id")
df.withColumn("count", F.count("diagnosis").over(w))\
  .withColumn("max", F.max("count").over(w2))\
  .filter("count=max")\
  .groupBy("id").agg(F.first("diagnosis_age").alias("diagnosis_age"),F.first("diagnosis").alias("diagnosis"))\
  .orderBy("id").show()

+---+------------------+---------+
| id|     diagnosis_age|diagnosis|
+---+------------------+---------+
|  1|2.1843037179180302|   315.32|
|  2|  5.70572315231258|   315.32|
|  3| 5.744053881894209|   315.32|
+---+------------------+---------+
1 голос
/ 25 марта 2020

Python: Вот преобразование моего scala кода.

from pyspark.sql.functions import col, first, count, desc, row_number
from pyspark.sql import Window

df.groupBy("id", "diagnosis").agg(first(col("diagnosis_age")).alias("diagnosis_age"), count(col("diagnosis_age")).alias("cnt")) \
  .withColumn("seqnum", row_number().over(Window.partitionBy("id").orderBy(col("cnt").desc()))) \
  .where("seqnum = 1") \
  .select("id", "diagnosis_age", "diagnosis", "cnt") \
  .orderBy("id") \
  .show(10, False)

Scala: Ваш запрос не соответствует имеет смысл для меня. Условие groupBy приводит к тому, что счет для записи всегда будет 1. Я немного изменил выражение данных, например

import org.apache.spark.sql.expressions.Window

df.groupBy("id", "diagnosis").agg(first(col("diagnosis_age")).as("diagnosis_age"), count(col("diagnosis_age")).as("cnt"))
  .withColumn("seqnum", row_number.over(Window.partitionBy("id").orderBy(col("cnt").desc)))
  .where("seqnum = 1")
  .select("id", "diagnosis_age", "diagnosis", "cnt")
  .orderBy("id")
  .show(false)

, где результат:

+---+------------------+---------+---+
|id |diagnosis_age     |diagnosis|cnt|
+---+------------------+---------+---+
|1  |2.1843037179180302|315.32   |3  |
|2  |5.70572315231258  |315.32   |2  |
|3  |5.744053881894209 |315.32   |3  |
+---+------------------+---------+---+
...