PySpark: как группировать с помощью или в столбцах - PullRequest
0 голосов
/ 20 сентября 2019

Я хочу сгруппировать в PySpark, но значение может отображаться более чем в столбцах, поэтому, если оно появится в любом из выбранных столбцов, оно будет сгруппировано по.

Например, если у меня есть этотаблица в Pyspark:

enter image description here

Я хочу суммировать посещения и инвестиции для каждого идентификатора, чтобы результат был:

enter image description here

Обратите внимание, что ID1 был суммой строк 0,1,3, которые имеют ID1 в одном из первых трех столбцов [Посещения ID1 = 500 + 100 + 200= 800].ID2 был суммой строк 1,2 и т. Д.

OBS 1: Ради простоты мой пример был простым кадром данных, но в действительности это гораздо больший df с большим количеством строк и большим количествомпеременных и других операций, а не просто «сумма».Это не может быть сработано на пандах, потому что оно слишком большое.Должно быть в PySpark

OBS2: Для иллюстрации я напечатал в pandas таблицы, но на самом деле это в PySpark

Я ценю всю помощь и большое спасибо заранее

Ответы [ 2 ]

2 голосов
/ 20 сентября 2019

Вы можете сделать что-то вроде следующего:

  1. Создать array из всех id столбцов-> ids столбец ниже
  2. explode ids столбец
  3. Теперь вы получите дубликаты, чтобы избежать дублирования агрегации, используйте distinct
  4. Наконец groupBy ids столбец и выполните все ваши агрегации

Примечание: : Если в вашем наборе данных могут быть точно повторяющиеся строки, добавьте один столбец с df.withColumn('uid', f.monotonically_increasing_id()) перед созданием массива, в противном случае distinct удалит его.

Пример набора данных:

import pyspark.sql.functions as f

df.withColumn('ids', f.explode(f.array('id1','id2','id3'))).distinct().groupBy('ids').agg(f.sum('visits'), f.sum('investments')).orderBy('ids').show()
+---+-----------+----------------+
|ids|sum(visits)|sum(investments)|
+---+-----------+----------------+
|  1|        800|            1400|
|  2|        300|             600|
|  3|        500|             800|
|  4|        700|            1200|
|  5|        200|             400|
+---+-----------+----------------+

1 голос
/ 20 сентября 2019

Прежде всего давайте создадим наш тестовый фрейм данных.

>>> import pandas as pd

>>> data = {
       "ID1": [1, 2, 5, 1],
       "ID2": [1, 1, 3, 3],
       "ID3": [4, 3, 2, 4],
       "Visits": [500, 100, 200, 200],
       "Investment": [1000, 200, 400, 200]
    }
>>> df = spark.createDataFrame(pd.DataFrame(data))
>>> df.show()

+---+---+---+------+----------+
|ID1|ID2|ID3|Visits|Investment|
+---+---+---+------+----------+
|  1|  1|  4|   500|      1000|
|  2|  1|  3|   100|       200|
|  5|  3|  2|   200|       400|
|  1|  3|  4|   200|       200|
+---+---+---+------+----------+

Как только у нас будет DataFrame, с которым мы можем работать, мы должны определить функцию, которая будет возвращать список уникальных идентификаторов из столбцов ID1, ID2 и ID3.

>>> import pyspark.sql.functions as F
>>> from pyspark.sql.types import ArrayType, IntegerType

>>> @F.udf(returnType=ArrayType(IntegerType()))
... def ids_list(*cols):
...    return list(set(cols))

Теперь пришло время применить наш udf к кадру данных.

>>> df = df.withColumn('ids', ids_list('ID1', 'ID2', 'ID3'))
>>> df.show()

+---+---+---+------+----------+---------+
|ID1|ID2|ID3|Visits|Investment|      ids|
+---+---+---+------+----------+---------+
|  1|  1|  4|   500|      1000|   [1, 4]|
|  2|  1|  3|   100|       200|[1, 2, 3]|
|  5|  3|  2|   200|       400|[2, 3, 5]|
|  1|  3|  4|   200|       200|[1, 3, 4]|
+---+---+---+------+----------+---------+

Чтобы использовать столбец ids, нам нужно взорватьсяэто в отдельные строки и опускает ids столбец.

>>> df = df.withColumn("ID", F.explode('ids')).drop('ids')
>>> df.show()

+---+---+---+------+----------+---+
|ID1|ID2|ID3|Visits|Investment| ID|
+---+---+---+------+----------+---+
|  1|  1|  4|   500|      1000|  1|
|  1|  1|  4|   500|      1000|  4|
|  2|  1|  3|   100|       200|  1|
|  2|  1|  3|   100|       200|  2|
|  2|  1|  3|   100|       200|  3|
|  5|  3|  2|   200|       400|  2|
|  5|  3|  2|   200|       400|  3|
|  5|  3|  2|   200|       400|  5|
|  1|  3|  4|   200|       200|  1|
|  1|  3|  4|   200|       200|  3|
|  1|  3|  4|   200|       200|  4|
+---+---+---+------+----------+---+

Наконец мы должны сгруппировать наш DataFrame по столбцу ID и вычислить суммы.Окончательный результат упорядочен по ID.

>>> final_df = (
...    df.groupBy('ID')
...       .agg( F.sum('Visits'), F.sum('Investment') )
...       .orderBy('ID')
... )
>>> final_df.show()

+---+-----------+---------------+
| ID|sum(Visits)|sum(Investment)|
+---+-----------+---------------+
|  1|        800|           1400|
|  2|        300|            600|
|  3|        500|            800|
|  4|        700|           1200|
|  5|        200|            400|
+---+-----------+---------------+

Надеюсь, вы сделаете его полезным.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...