Spark DataFrame: получить отсортированные по строке имена столбцов на основе значений столбцов - PullRequest
1 голос
/ 02 июля 2019

Для каждой строки в приведенном ниже кадре данных я хочу найти имена столбцов (в виде массива, кортежа или чего-то еще) в соответствии с записями по убыванию столбцов.Итак, для фрейма данных

+---+---+---+---+---+
| ID|key|  a|  b|  c|
+---+---+---+---+---+
|  0|  1|  5|  2|  1|
|  1|  1|  3|  4|  5|
+---+---+---+---+---+

я хочу найти

+---+---+---+---+---+------------------+
| ID|key|  a|  b|  c|descending_columns|
+---+---+---+---+---+------------------+
|  0|  1|  5|  2|  1|           [a,b,c]|
|  1|  1|  3|  4|  5|           [c,b,a]|
+---+---+---+---+---+------------------+

В идеале и вообще я хочу иметь возможность перебирать предварительно определенные столбцы и применять функцию на основе этихзаписи в столбцах.Это может выглядеть так:

import pyspark.sql.functions as f

name_cols = ["a","b","c"]

for col in name_cols: 
    values_ls.append = []
    ...schema specification....
    values_ls.append(f.col(col) ...get column value... )

df1 = df.withColumn("descending_columns", values_ls)

Вопрос довольно прост, но, кажется, довольно сложно реализовать его в pyspark.

Я использую pyspark версии 2.3.3.

Ответы [ 2 ]

1 голос
/ 02 июля 2019

Для версий Spark <2.4 вы можете достичь этого без <code>udf, используя sort_array и struct.

Сначала получите список столбцов для сортировки

cols_to_sort = df.columns[2:]
print(cols_to_sort)
#['a', 'b', 'c']

Теперь создайте структуру с двумя элементами - "value" и "key"."key" - это имя столбца, а "value" - это значение столбца.Если вы гарантируете, что "value" стоит первым в struct, вы можете использовать sort_array для сортировки этого массива структур так, как вы хотите.

После того, как массив отсортирован, вам просто нужно перебрать его и извлечь часть "key", которая содержит имена столбцов.

from pyspark.sql.functions import array, col, lit, sort_array, struct
df.withColumn(
    "descending_columns", 
    array(
        *[
            sort_array(
                array(
                    *[
                        struct([col(c).alias("value"), lit(c).alias("key")]) 
                        for c in cols_to_sort
                    ]
                ), 
                asc=False
            )[i]["key"]
            for i in range(len(cols_to_sort))
        ]
    )
).show(truncate=False)
#+---+---+---+---+---+------------------+
#|ID |key|a  |b  |c  |descending_columns|
#+---+---+---+---+---+------------------+
#|0  |1  |5  |2  |1  |[a, b, c]         |
#|1  |1  |3  |4  |5  |[c, b, a]         |
#+---+---+---+---+---+------------------+

Даже если это выглядит сложно, оно должнообеспечивает более высокую производительность, чем решение udf.


Обновление : для сортировки по исходному порядку столбцов в случае привязки значения можно вставить другое значение вструктура, которая содержит индекс.Так как сортировка по убыванию, мы используем отрицательный индекс.

Например, если ваш входной фрейм данных был следующим:

df.show()
#+---+---+---+---+---+
#| ID|key|  a|  b|  c|
#+---+---+---+---+---+
#|  0|  1|  5|  2|  1|
#|  1|  1|  3|  4|  5|
#|  2|  1|  4|  4|  5|
#+---+---+---+---+---+

Последняя строка выше имеет связующее значение между a и b.В этом случае мы хотим a отсортировать до b.

df.withColumn(
    "descending_columns", 
    array(
        *[
            sort_array(
                array(
                    *[
                        struct(
                            [
                                col(c).alias("value"), 
                                lit(-j).alias("index"), 
                                lit(c).alias("key")
                            ]
                        ) 
                        for j, c in enumerate(cols_to_sort)
                    ]
                ), 
                asc=False
            )[i]["key"]
            for i in range(len(cols_to_sort))
        ]
    )
).show(truncate=False)
#+---+---+---+---+---+------------------+
#|ID |key|a  |b  |c  |descending_columns|
#+---+---+---+---+---+------------------+
#|0  |1  |5  |2  |1  |[a, b, c]         |
#|1  |1  |3  |4  |5  |[c, b, a]         |
#|2  |1  |4  |4  |5  |[c, a, b]         |
#+---+---+---+---+---+------------------+
1 голос
/ 02 июля 2019

Вы можете вставить столбцы в одну структуру и обработать ее в формате udf.

from pyspark.sql import functions as F
from pyspark.sql import types as T

name_cols = ['a', 'b', 'c']

def ordered_columns(row):
    return [x for _,x in sorted(zip(row.asDict().values(), name_cols), reverse=True)]
udf_ordered_columns = F.udf(ordered_columns, T.ArrayType(T.StringType()))

df1 = (
    df
    .withColumn(
        'row',
        F.struct(*name_cols)
    )
    .withColumn(
        'descending_columns',
        udf_ordered_columns('row')
    )
)

Что-то вроде этого должно работать, если выше не работает, тогда дайте мне знать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...