Выберите имя столбца в строке для максимального значения в PySpark - PullRequest
1 голос
/ 31 мая 2019

У меня есть такой фрейм данных, показано только два столбца, однако в исходном фрейме данных много столбцов

data = [(("ID1", 3, 5)), (("ID2", 4, 12)), (("ID3", 8, 3))]
df = spark.createDataFrame(data, ["ID", "colA", "colB"])
df.show()

+---+----+----+
| ID|colA|colB|
+---+----+----+
|ID1|   3|   5|
|ID2|   4|  12|
|ID3|   8|   3|
+---+----+----+

Я хочу извлечь имя столбца в строке, которое имеет максимальное значение. Следовательно, ожидаемый результат такой:

+---+----+----+-------+
| ID|colA|colB|Max_col|
+---+----+----+-------+
|ID1|   3|   5|   colB|
|ID2|   4|  12|   colB|
|ID3|   8|   3|   colA|
+---+----+----+-------+

В случае связывания, когда colA и colB имеют одинаковое значение, выберите первый столбец.

Как мне этого добиться в pyspark

Ответы [ 5 ]

1 голос
/ 31 мая 2019

Вы можете использовать UDF в каждой строке для построчного вычисления и использовать struct для передачи нескольких столбцов в udf. Надеюсь, это поможет.

from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
data = [(("ID1", 3, 5,78)), (("ID2", 4, 12,45)), (("ID3", 8, 3,67))]
df = spark.createDataFrame(data, ["ID", "colA", "colB","colC"])
df.show()

+---+----+----+----+
| ID|colA|colB|colC|
+---+----+----+----+
|ID1|   3|   5|  78|
|ID2|   4|  12|  45|
|ID3|   8|   3|  67|
+---+----+----+----+
cols = df.columns
maxcol = F.udf(lambda row: max(row), IntegerType())
maxDF = df.withColumn("maxval", maxcol(F.struct([df[x] for x in df.columns[1:]])))
maxDF.show()

+---+----+----+----+-------+
|ID |colA|colB|colC|Max_col|
+---+----+----+----+-------+
|ID1|3   |5   |78  |78     |
|ID2|4   |12  |45  |45     |
|ID3|8   |3   |67  |67     |
+---+----+----+----+-------+
1 голос
/ 31 мая 2019

попробуйте следующее:

from  pyspark.sql import functions as F
data = [(("ID1", 3, 5)), (("ID2", 4, 12)), (("ID3", 8, 3))]
df = spark.createDataFrame(data, ["ID", "colA", "colB"])
df.withColumn('max_col',
   F.when(F.col('colA') > F.col('colB'), 'colA').
     otherwise('colB')).show()

Урожайность:

+---+----+----+-------+
| ID|colA|colB|max_col|
+---+----+----+-------+
|ID1|   3|   5|   colB|
|ID2|   4|  12|   colB|
|ID3|   8|   3|   colA|
+---+----+----+-------+
0 голосов
/ 31 мая 2019

Расширение того, что сделал Суреш .... возвращает соответствующее имя столбца

from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType, StringType

import numpy as np

data = [(("ID1", 3, 5,78)), (("ID2", 4, 12,45)), (("ID3", 68, 3,67))]
df = spark.createDataFrame(data, ["ID", "colA", "colB","colC"])
df.show()

cols = df.columns
maxcol = f.udf(lambda row: cols[row.index(max(row)) +1], StringType())

maxDF = df.withColumn("Max_col", maxcol(f.struct([df[x] for x in df.columns[1:]])))
maxDF.show(truncate=False)

+---+----+----+----+------+
|ID |colA|colB|colC|Max_col|
+---+----+----+----+------+
|ID1|3   |5   |78  |colC  |
|ID2|4   |12  |45  |colC  |
|ID3|68  |3   |67  |colA  |
+---+----+----+----+------+
0 голосов
/ 31 мая 2019

Вы можете использовать RDD API для добавления нового столбца:

df.rdd.map(lambda r: r.asDict())\
       .map(lambda r: Row(Max_col=max([i for i in r.items() if i[0] != 'ID'], 
                                      key=lambda kv: kv[1])[0], **r) )\
       .toDF()

В результате:

+---+-------+----+----+
| ID|Max_col|colA|colB|
+---+-------+----+----+
|ID1|   colB|   3|   5|
|ID2|   colB|   4|  12|
|ID3|   colA|   8|   3|
+---+-------+----+----+
0 голосов
/ 31 мая 2019

Есть несколько вариантов для достижения этой цели. Я приведу пример для одного и могу дать подсказку для отдыха-

from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
from pyspark.sql import types as T

data = [(("ID1", 3, 5)), (("ID2", 4, 12)), (("ID3", 8, 3))]
df = spark.createDataFrame(data, ["ID", "colA", "colB"])
df.show()

+---+----+----+
| ID|colA|colB|
+---+----+----+
|ID1|   3|   5|
|ID2|   4|  12|
|ID3|   8|   3|
+---+----+----+

#Below F.array creates an array of column name and value pair like [['colA', 3], ['colB', 5]] then F.explode break this array into rows like different column and value pair should be in different rows

df = df.withColumn(
    "max_val",
    F.explode(
        F.array([
            F.array([F.lit(cl), F.col(cl)]) for cl in df.columns[1:]
        ])
    )
)
df.show()
+---+----+----+----------+
| ID|colA|colB|   max_val|
+---+----+----+----------+
|ID1|   3|   5| [colA, 3]|
|ID1|   3|   5| [colB, 5]|
|ID2|   4|  12| [colA, 4]|
|ID2|   4|  12|[colB, 12]|
|ID3|   8|   3| [colA, 8]|
|ID3|   8|   3| [colB, 3]|
+---+----+----+----------+

#Then select columns so that column name and value should be in different columns
df = df.select(
    "ID", 
    "colA", 
    "colB", 
    F.col("max_val").getItem(0).alias("col_name"),
    F.col("max_val").getItem(1).cast(T.IntegerType()).alias("col_value"),
)
df.show()
+---+----+----+--------+---------+
| ID|colA|colB|col_name|col_value|
+---+----+----+--------+---------+
|ID1|   3|   5|    colA|        3|
|ID1|   3|   5|    colB|        5|
|ID2|   4|  12|    colA|        4|
|ID2|   4|  12|    colB|       12|
|ID3|   8|   3|    colA|        8|
|ID3|   8|   3|    colB|        3|
+---+----+----+--------+---------+

# Rank column values based on ID in desc order
df = df.withColumn(
    "rank",
    F.rank().over(W.partitionBy("ID").orderBy(F.col("col_value").desc()))
)
df.show()
+---+----+----+--------+---------+----+
| ID|colA|colB|col_name|col_value|rank|
+---+----+----+--------+---------+----+
|ID2|   4|  12|    colB|       12|   1|
|ID2|   4|  12|    colA|        4|   2|
|ID3|   8|   3|    colA|        8|   1|
|ID3|   8|   3|    colB|        3|   2|
|ID1|   3|   5|    colB|        5|   1|
|ID1|   3|   5|    colA|        3|   2|
+---+----+----+--------+---------+----+

#Finally Filter rank = 1 as max value have rank 1 because we ranked desc value
df.where("rank=1").show()
+---+----+----+--------+---------+----+
| ID|colA|colB|col_name|col_value|rank|
+---+----+----+--------+---------+----+
|ID2|   4|  12|    colB|       12|   1|
|ID3|   8|   3|    colA|        8|   1|
|ID1|   3|   5|    colB|        5|   1|
+---+----+----+--------+---------+----+

Другие опции -

  • Используйте UDF для вашего базового df и возвращайте имя столбца с максимальным значением
  • В том же примере после создания столбца с именем и значением столбца вместо ранга используйте группу ID, возьмите макс. col_value. Затем присоединитесь к предыдущему дф.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...