PySpark эффективный способ N самых больших элементов - PullRequest
0 голосов
/ 08 ноября 2019

My dataset Итак, я должен получить n (по умолчанию 3) самых больших элементов из этого набора данных. Как мне сделать это в PySpark приемлемым способом? Я знаю, как это сделать в Pandas, но мне хотелось бы знать, как это эффективно сделать в PySpark или можно ли это сделать эффективно. Моей первой идеей было использование наибольшего из pyspark.sql.functions, подобного этому

ls = []
cols = df_tmp.columns[:-1]
for j in cols:
        max_v = df_tmp.where(df_tmp["Variable"] == j).select(F.greatest(*[F.col(col) for col in cols]))
        ls.append(max_v.collect()[0][0])
return ls.max

Но, похоже, это очень плохой подход, поскольку он возвращает наибольшее значение (0,984), а не комбинацию (ЧарлиФокстрот). Также я не вижу, как получить второе по величине значение без перезаписи значения в ячейке (Чарли, Фокстрот), что, как я думал, вам не следует делать в PySpark.

Спасибо, что прочитали это и особенновсем, кто может ответить:)

1 Ответ

0 голосов
/ 09 ноября 2019

Вы можете объединить все столбцы от Alpha до Foxtrot, чтобы создать фрейм данных с тремя столбцами (числовое значение, столбец переменной, имя столбца значения). Пожалуйста, посмотрите на приведенный ниже пример:

import random
#creating a dataframe similiar to yours
columns = ['A','B','C','D','E','F']
l = [[random.random() if c!=r else None for c in range(6)] for r in range(6)]
l = [x + [columns[i]] for i,x in enumerate(l)]

df=spark.createDataFrame(l, columns)
df.show()

Выход:


+-------------------+--------------------+--------------------+--------------------+-------------------+-------------------+---+
|                  A|                   B|                   C|                   D|                  E|                  F| _7|
+-------------------+--------------------+--------------------+--------------------+-------------------+-------------------+---+
|               null| 0.37958341713258026| 0.31880755415785833|  0.8908555547489883|0.41632799280431776| 0.0729721304772899|  A|
|0.21814744903713268|                null|0.024393462170815394|  0.9940573571339111| 0.7841527980918188|  0.194722179975509|  B|
|  0.786507586894131|  0.9155528558183477|                null|  0.5782381547037391| 0.9714912596343181| 0.5446460767903856|  C|
| 0.9108497603580163|  0.5088891113970719| 0.35594300627798736|                null|  0.514258802933162|0.19317616393798986|  D|
|  0.193214269992123|  0.6259176088252493|  0.4425532657461867|0.050484163355697276|               null| 0.6594661109179668|  E|
| 0.5567272189587709|0.020606558131312402| 0.21905184240270814|  0.2817064382900445| 0.5409619970394691|               null|  F|
+-------------------+--------------------+--------------------+--------------------+-------------------+-------------------+---+
import pyspark.sql.functions as F

newdf = df.select(F.col('A').alias('value'), F.col('_7').alias('row'), F.lit('A').alias('column'))

for col in columns[1:]:
    newdf = newdf.union(df.select(col, '_7', F.lit(col)))

newdf.orderBy(newdf.value.desc()).show(3)

Выход:

+------------------+---+------+
|             value|row|column|
+------------------+---+------+
|0.9940573571339111|  B|     D|
|0.9714912596343181|  C|     E|
|0.9155528558183477|  C|     B|
+------------------+---+------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...