Идея состоит в том, чтобы объединить оба DataFrames вместе, а затем применить операцию division
.Так как df2
содержит имена столбцов и соответствующее значение, поэтому нам нужно сначала pivot()
, а затем объединить с основной таблицей df1
.(Поворот является дорогостоящей операцией, но она должна работать, пока размер DataFrame невелик.)
# Loading the requisite packages
from pyspark.sql.functions import col
from functools import reduce
from operator import add
# Creating the DataFrames
df1 = sqlContext.createDataFrame([('ID1', 10, 30), ('ID2', 20, 60)],('ID','ColA','ColB'))
df2 = sqlContext.createDataFrame([('ColA', 2), ('ColB', 5)],('Column','Value'))
Код довольно универсальный, поэтому нам не нужно указывать имена столбцов самостоятельно.,Мы находим имена столбцов, с которыми нам нужно работать.За исключением ID
нам нужны все.
# This contains the list of columns where we apply mathematical operations
columns_to_be_operated = df1.columns
columns_to_be_operated.remove('ID')
print(columns_to_be_operated)
['ColA', 'ColB']
Поворот df2
, к которому мы примем df1
.
# Pivoting the df2 to get the rows in column form
df2 = df2.groupBy().pivot('Column').sum('Value')
df2.show()
+----+----+
|ColA|ColB|
+----+----+
| 2| 5|
+----+----+
Мы можем изменить имена столбцов, чтобыу нас нет повторяющегося имени для каждого столбца.Мы делаем это, добавляя суффикс _x
ко всем именам.
# Dynamically changing the name of the columns in df2
df2 = df2.select([col(c).alias(c+'_x') for c in df2.columns])
df2.show()
+------+------+
|ColA_x|ColB_x|
+------+------+
| 2| 5|
+------+------+
Затем мы объединяем таблицы с помощью декартового объединения.(Обратите внимание, что у вас могут возникнуть проблемы с памятью, если df2
велико.)
df = df1.crossJoin(df2)
df.show()
+---+----+----+------+------+
| ID|ColA|ColB|ColA_x|ColB_x|
+---+----+----+------+------+
|ID1| 10| 30| 2| 5|
|ID2| 20| 60| 2| 5|
+---+----+----+------+------+
Наконец добавьте столбцы, разделив их сначала соответствующим значением.reduce()
применяет функцию add()
двух аргументов, в совокупности, к элементам последовательности.
df = df.withColumn(
'finalSum',
reduce(add, [col(c)/col(c+'_x') for c in columns_to_be_operated])
).select('ID','finalSum')
df.show()
+---+--------+
| ID|finalSum|
+---+--------+
|ID1| 11.0|
|ID2| 22.0|
+---+--------+
Примечание: OP должен быть осторожен с делением на 0. Фрагмент чуть выше можно изменить, чтобы учесть это условие.