PySpark - получение агрегированных значений для динамических c столбцов в кадре данных - PullRequest
0 голосов
/ 26 мая 2020

У меня есть фрейм данных со следующими строками:

+------+--------+-------+-------+
| label| machine| value1| value2|
+------+--------+-------+-------+
|label1|machine1|     13|    7.5|
|label1|machine1|     9 |    7.5|
|label1|machine1|    8.5|    7.5|
|label1|machine1|   10.5|    7.5|
|label1|machine1|     12|      8|
|label1|machine2|     8 |   13.5|
|label1|machine2|     18|     10|
|label1|machine2|     10|     14|
|label1|machine2|     9 |   10.5|
|label1|machine2|    8.5|     10|
|label2|machine3|     8 |    7.5|
|label2|machine3|     18|    7.5|
|label2|machine3|     10|    7.5|
|label2|machine3|     9 |    7.5|
|label2|machine3|    8.5|      8|
|label2|machine4|   13.5|     13|
|label2|machine4|     10|      9|
|label2|machine4|     14|    8.5|
|label2|machine4|   10.5|   10.5|
|label2|machine4|     10|     12|
+------+--------+-------+-------+

Здесь я могу иметь несколько столбцов значений, кроме value1, value2, во фрейме данных. Для каждого столбца я хочу агрегировать значения с помощью collect_list и создать новый столбец во фрейме данных, чтобы я мог выполнять некоторые функции позже.

Для этого я пробовал вот так:

my_df = my_df.groupBy(['label', 'machine']). \
     agg(collect_list("value1").alias("col_value1"), collect_list("value2").alias("col_value2"))

Это дает мне следующие 4 строки, поскольку я группирую столбцы label и machine.

+------+--------+--------------------+--------------------+
| label| machine|    collected_value1|    collected_value2|
+------+--------+--------------------+--------------------+
|label1|machine1|[13.0, 9.0, 8.5, ...|[7.5, 7.5, 7.5, 7...|
|label2|machine2|[8.0, 18.0, 10.0,...|[13.5, 10.0, 14, ...|
|label1|machine3|[8.0, 18.0, 10.0,...|[7.5, 7.5, 7.5, 7...|
|label2|machine4|[13.5, 10.0, 14, ...|[13.0, 9.0, 8.5, ...|
+------+--------+--------------------+--------------------+

Теперь моя проблема заключается в том, как динамически передавать столбцы в эта группа. Столбцы могут отличаться для каждого прогона, поэтому я хочу использовать что-то вроде этого:

df_cols = ['value1', 'value2']

my_df = my_df.groupBy(['label', 'machine']). \
    agg(collect_list(col_name).alias(str(col_name+"_collected")) for col_name in df_cols)

Это дает мне AssertionError: all exprs should be Column ошибку.

Как я могу этого добиться? Может ли кто-нибудь помочь мне в этом?

Заранее спасибо.

1 Ответ

0 голосов
/ 26 мая 2020

Приведенный ниже код сработал. Спасибо.

exprs = [collect_list(x).alias(str(x+"_collected")) for x in df_cols]
my_df = my_df.groupBy(['label', 'machine']).agg(*exprs)
...