pyspark collect_set столбца вне группы - PullRequest
1 голос
/ 07 ноября 2019

Я пытаюсь использовать collect_set, чтобы получить список строк имен категорий, которые НЕ являются частью groupby. Мой код

from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F

sc = SparkContext("local")
sqlContext = HiveContext(sc)
df = sqlContext.createDataFrame([
     ("1", "cat1", "Dept1", "product1", 7),
     ("2", "cat2", "Dept1", "product1", 100),
     ("3", "cat2", "Dept1", "product2", 3),
     ("4", "cat1", "Dept2", "product3", 5),
    ], ["id", "category_name", "department_id", "product_id", "value"])

df.show()
df.groupby("department_id", "product_id")\
    .agg({'value': 'sum'}) \
    .show()

#            .agg( F.collect_set("category_name"))\

Вывод

+---+-------------+-------------+----------+-----+
| id|category_name|department_id|product_id|value|
+---+-------------+-------------+----------+-----+
|  1|         cat1|        Dept1|  product1|    7|
|  2|         cat2|        Dept1|  product1|  100|
|  3|         cat2|        Dept1|  product2|    3|
|  4|         cat1|        Dept2|  product3|    5|
+---+-------------+-------------+----------+-----+

+-------------+----------+----------+
|department_id|product_id|sum(value)|
+-------------+----------+----------+
|        Dept1|  product2|         3|
|        Dept1|  product1|       107|
|        Dept2|  product3|         5|
+-------------+----------+----------+

Я хочу получить этот вывод

+-------------+----------+----------+----------------------------+
|department_id|product_id|sum(value)| collect_list(category_name)|
+-------------+----------+----------+----------------------------+
|        Dept1|  product2|         3|  cat2                      |
|        Dept1|  product1|       107|  cat1, cat2                |
|        Dept2|  product3|         5|  cat1                      |
+-------------+----------+----------+----------------------------+

Попытка 1

df.groupby("department_id", "product_id")\
    .agg({'value': 'sum'}) \
    .agg(F.collect_set("category_name")) \
    .show()

Я получил эту ошибку:

pyspark.sql.utils.AnalysisException: "не удается разрешить" category_name 'для указанных входных столбцов: [департамент_идентификатор_продукта, сумма (значение)];; \ n'Aggregate [collect_set ('category_name, 0, 0) AS collect_set (category_name) # 35] \ n + - Агрегировать [департамент_id # 2, product_id # 3], [департамент_id # 2, product_id # 3, сумма (значение# 4L) AS sum (value) # 24L] \ n + - LogicalRDD [id # 0, имя_категории # 1, идентификатор_достава # 2, product_id # 3, значение # 4L] \ n "

Попытка 2 Я помещаю category_name как часть groupby

df.groupby("category_name", "department_id", "product_id")\
    .agg({'value': 'sum'}) \
    .agg(F.collect_set("category_name")) \
    .show()

Это работает, но вывод неправильный

+--------------------------+
|collect_set(category_name)|
+--------------------------+
|              [cat1, cat2]|
+--------------------------+

1 Ответ

1 голос
/ 07 ноября 2019

Вы можете указать несколько агрегаций в одном agg(). Правильный синтаксис для вашего случая:

df.groupby("department_id", "product_id")\
    .agg(F.sum('value'), F.collect_set("category_name"))\
    .show()
#+-------------+----------+----------+--------------------------+
#|department_id|product_id|sum(value)|collect_set(category_name)|
#+-------------+----------+----------+--------------------------+
#|        Dept1|  product2|         3|                    [cat2]|
#|        Dept1|  product1|       107|              [cat1, cat2]|
#|        Dept2|  product3|         5|                    [cat1]|
#+-------------+----------+----------+--------------------------+

Ваш метод не работает, потому что первый .agg() работает на pyspark.sql.group.GroupedData и возвращает новый DataFrame. Последующий вызов agg на самом деле pyspark.sql.DataFrame.agg, что является

сокращением для df.groupBy.agg()

Таким образом, по сути, второй вызов agg снова группируется, что не соответствует вашим ожиданиям.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...