SQL-запрос Pyspark для получения строк, которые составляют +/- 20% от определенного столбца - PullRequest
3 голосов
/ 07 марта 2019

У меня есть следующий pyspark df:

+------------------+--------+-------+
|                ID|  Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944|  32850|
|201542399349300629| 3979760| 850914|
|201542399349300634| 3402687|1983568|
|201542399349300724| 1138291|1097553|
|201522369349300122| 1401406|1010828|
|201522369349300137|   16948| 171534|
|201522369349300142|13474056|2285323|
|201522369349300202|  481045| 241788|
|201522369349300207|  700861|1185640|
|201522369349300227|  178479| 267976|
+------------------+--------+-------+

Для каждой строки я хочу иметь возможность получать строки, которые находятся в пределах 20% от суммы активов. Например, для первой строки (ID = 201542399349300619) я хочу, чтобы все строки с активами находились в пределах 20% +/- 1 633 944 (т. Е. От 1 307 155 до 1 960 732):

+------------------+--------+-------+
|                ID|  Assets|Revenue|
+------------------+--------+-------+
|201542399349300619| 1633944|  32850|
|201522369349300122| 1401406|1010828|

Используя эту таблицу с подмножествами, я хочу получить средние активы и добавить ее в качестве нового столбца. Таким образом, для приведенного выше примера это будут средние активы (1633944 + 1401406) = 1517675

+------------------+--------+-------+---------+
|                ID|  Assets|Revenue|AvgAssets|
+------------------+--------+-------+---------+
|201542399349300619| 1633944|  32850|  1517675|

1 Ответ

4 голосов
/ 07 марта 2019

Предполагая, что ваш DataFrame имеет схему, подобную следующей (т. Е. Assets и Revenue являются числовыми):

df.printSchema()
#root
# |-- ID: long (nullable = true)
# |-- Assets: integer (nullable = true)
# |-- Revenue: integer (nullable = true)

Вы можете присоединить DataFrame к себе наусловие, которое вы изложили.После объединения вы можете группировать и агрегировать, беря среднее значение столбца Assets.

Например:

from pyspark.sql.functions import avg, expr

df.alias("l")\
    .join(
        df.alias("r"), 
        on=expr("r.assets between l.assets*0.8 and l.assets*1.2")
    )\
    .groupBy("l.ID", "l.Assets", "l.Revenue")\
    .agg(avg("r.Assets").alias("AvgAssets"))\
    .show()
#+------------------+--------+-------+------------------+
#|                ID|  Assets|Revenue|         AvgAssets|
#+------------------+--------+-------+------------------+
#|201542399349300629| 3979760| 850914|         3691223.5|
#|201522369349300202|  481045| 241788|          481045.0|
#|201522369349300207|  700861|1185640|          700861.0|
#|201522369349300137|   16948| 171534|           16948.0|
#|201522369349300142|13474056|2285323|       1.3474056E7|
#|201522369349300227|  178479| 267976|          178479.0|
#|201542399349300619| 1633944|  32850|         1517675.0|
#|201522369349300122| 1401406|1010828|1391213.6666666667|
#|201542399349300724| 1138291|1097553|         1138291.0|
#|201542399349300634| 3402687|1983568|         3691223.5|
#+------------------+--------+-------+------------------+

Поскольку мы присоединяем DataFrame к себе, мы можем использоватьпсевдонимы для ссылки на левую таблицу ("l") и правую таблицу ("r").Приведенная выше логика гласит: присоедините l к r при условии, что активы в r составляют + / 20% активов в l.

Существует несколько способов выразить + /Условие 20%, но я использую выражение spark-sql between, чтобы найти строки от Assets * 0.8 до Assets * 1.2.

Затем мы агрегируем по всем столбцам (groupBy)левая таблица и усреднение по активам в правой таблице.

Результирующий столбец AvgAssets является столбцом FloatType, но вы можете легко преобразовать его в IntegerType, добавив .cast("int") перед.alias("AvgAssets") если это то, что вы предпочитаете.


См. Также:

...