Сумма товара в PySpark - PullRequest
       15

Сумма товара в PySpark

0 голосов
/ 13 марта 2019

У меня есть такой фрейм данных pyspark

data = [(("ID1", 10, 30)), (("ID2", 20, 60))]
df1 = spark.createDataFrame(data, ["ID", "colA", "colB"])
df1.show()

df1: 
+---+-----------+
| ID| colA| colB|
+---+-----------+
|ID1|   10|   30|
|ID2|   20|   60| 
+---+-----------+

У меня есть другой фрейм данных, подобный этому

data = [(("colA", 2)), (("colB", 5))]
df2 = spark.createDataFrame(data, ["Column", "Value"])
df2.show()

df2:
+-------+------+
| Column| Value|
+-------+------+
|   colA|     2|
|   colB|     5| 
+-------+------+

Я хочу разделить каждый столбец в df1 на соответствующее значение в df2.Следовательно, df3 будет выглядеть как

df3: 
+---+-------------------------+
| ID|        colA|        colB|
+---+------------+------------+
|ID1|    10/2 = 5|    30/5 = 6|
|ID2|   20/2 = 10|   60/5 = 12| 
+---+------------+------------+

В конечном счете, я хочу добавить colA и colB, чтобы получить окончательный df4 для идентификатора

df4: 
+---+---------------+
| ID|       finalSum|
+---+---------------+
|ID1|     5 + 6 = 11|
|ID2|   10 + 12 = 22| 
+---+---------------+

1 Ответ

2 голосов
/ 13 марта 2019

Идея состоит в том, чтобы объединить оба DataFrames вместе, а затем применить операцию division.Так как df2 содержит имена столбцов и соответствующее значение, поэтому нам нужно сначала pivot(), а затем объединить с основной таблицей df1.(Поворот является дорогостоящей операцией, но она должна работать, пока размер DataFrame невелик.)

# Loading the requisite packages
from pyspark.sql.functions import col
from functools import reduce
from operator import add

# Creating the DataFrames
df1 = sqlContext.createDataFrame([('ID1', 10, 30), ('ID2', 20, 60)],('ID','ColA','ColB'))
df2 = sqlContext.createDataFrame([('ColA', 2), ('ColB', 5)],('Column','Value'))

Код довольно универсальный, поэтому нам не нужно указывать имена столбцов самостоятельно.,Мы находим имена столбцов, с которыми нам нужно работать.За исключением ID нам нужны все.

# This contains the list of columns where we apply mathematical operations
columns_to_be_operated = df1.columns
columns_to_be_operated.remove('ID')
print(columns_to_be_operated)
    ['ColA', 'ColB']

Поворот df2, к которому мы примем df1.

# Pivoting the df2 to get the rows in column form
df2 = df2.groupBy().pivot('Column').sum('Value')
df2.show()
+----+----+ 
|ColA|ColB| 
+----+----+ 
|   2|   5| 
+----+----+

Мы можем изменить имена столбцов, чтобыу нас нет повторяющегося имени для каждого столбца.Мы делаем это, добавляя суффикс _x ко всем именам.

# Dynamically changing the name of the columns in df2
df2 = df2.select([col(c).alias(c+'_x') for c in df2.columns])
df2.show()
+------+------+ 
|ColA_x|ColB_x| 
+------+------+ 
|     2|     5| 
+------+------+

Затем мы объединяем таблицы с помощью декартового объединения.(Обратите внимание, что у вас могут возникнуть проблемы с памятью, если df2 велико.)

df = df1.crossJoin(df2)
df.show()
+---+----+----+------+------+ 
| ID|ColA|ColB|ColA_x|ColB_x| 
+---+----+----+------+------+ 
|ID1|  10|  30|     2|     5| 
|ID2|  20|  60|     2|     5| 
+---+----+----+------+------+

Наконец добавьте столбцы, разделив их сначала соответствующим значением.reduce() применяет функцию add() двух аргументов, в совокупности, к элементам последовательности.

df = df.withColumn(
    'finalSum', 
    reduce(add, [col(c)/col(c+'_x') for c in columns_to_be_operated])
).select('ID','finalSum')

df.show()
+---+--------+ 
| ID|finalSum| 
+---+--------+ 
|ID1|    11.0| 
|ID2|    22.0| 
+---+--------+

Примечание: OP должен быть осторожен с делением на 0. Фрагмент чуть выше можно изменить, чтобы учесть это условие.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...