Прежде всего давайте создадим наш тестовый фрейм данных.
>>> import pandas as pd
>>> data = {
"ID1": [1, 2, 5, 1],
"ID2": [1, 1, 3, 3],
"ID3": [4, 3, 2, 4],
"Visits": [500, 100, 200, 200],
"Investment": [1000, 200, 400, 200]
}
>>> df = spark.createDataFrame(pd.DataFrame(data))
>>> df.show()
+---+---+---+------+----------+
|ID1|ID2|ID3|Visits|Investment|
+---+---+---+------+----------+
| 1| 1| 4| 500| 1000|
| 2| 1| 3| 100| 200|
| 5| 3| 2| 200| 400|
| 1| 3| 4| 200| 200|
+---+---+---+------+----------+
Как только у нас будет DataFrame, с которым мы можем работать, мы должны определить функцию, которая будет возвращать список уникальных идентификаторов из столбцов ID1
, ID2
и ID3
.
>>> import pyspark.sql.functions as F
>>> from pyspark.sql.types import ArrayType, IntegerType
>>> @F.udf(returnType=ArrayType(IntegerType()))
... def ids_list(*cols):
... return list(set(cols))
Теперь пришло время применить наш udf
к кадру данных.
>>> df = df.withColumn('ids', ids_list('ID1', 'ID2', 'ID3'))
>>> df.show()
+---+---+---+------+----------+---------+
|ID1|ID2|ID3|Visits|Investment| ids|
+---+---+---+------+----------+---------+
| 1| 1| 4| 500| 1000| [1, 4]|
| 2| 1| 3| 100| 200|[1, 2, 3]|
| 5| 3| 2| 200| 400|[2, 3, 5]|
| 1| 3| 4| 200| 200|[1, 3, 4]|
+---+---+---+------+----------+---------+
Чтобы использовать столбец ids
, нам нужно взорватьсяэто в отдельные строки и опускает ids
столбец.
>>> df = df.withColumn("ID", F.explode('ids')).drop('ids')
>>> df.show()
+---+---+---+------+----------+---+
|ID1|ID2|ID3|Visits|Investment| ID|
+---+---+---+------+----------+---+
| 1| 1| 4| 500| 1000| 1|
| 1| 1| 4| 500| 1000| 4|
| 2| 1| 3| 100| 200| 1|
| 2| 1| 3| 100| 200| 2|
| 2| 1| 3| 100| 200| 3|
| 5| 3| 2| 200| 400| 2|
| 5| 3| 2| 200| 400| 3|
| 5| 3| 2| 200| 400| 5|
| 1| 3| 4| 200| 200| 1|
| 1| 3| 4| 200| 200| 3|
| 1| 3| 4| 200| 200| 4|
+---+---+---+------+----------+---+
Наконец мы должны сгруппировать наш DataFrame по столбцу ID
и вычислить суммы.Окончательный результат упорядочен по ID
.
>>> final_df = (
... df.groupBy('ID')
... .agg( F.sum('Visits'), F.sum('Investment') )
... .orderBy('ID')
... )
>>> final_df.show()
+---+-----------+---------------+
| ID|sum(Visits)|sum(Investment)|
+---+-----------+---------------+
| 1| 800| 1400|
| 2| 300| 600|
| 3| 500| 800|
| 4| 700| 1200|
| 5| 200| 400|
+---+-----------+---------------+
Надеюсь, вы сделаете его полезным.