Добавление ключей словаря в качестве имени столбца и значения словаря в качестве постоянного значения этого столбца в Pyspark df - PullRequest
0 голосов
/ 04 декабря 2018

У меня есть словарь x = {'colA': 20, 'colB': 30} и pyspark df.

ID Value
1  ABC
1  BCD
1  AKB
2  CAB
2  AIK
3  KIB 

Я хочу создать df1, используя x следующим образом:

ID Value colA colB
1  ABC    20.0  30.0
1  BCD    20.0  30.0
1  AKB    20.0  30.0
2  CAB    20.0  30.0
...

Любая идея, как это сделать Pyspark,Я знаю, что могу создать постоянный столбец, подобный этому,

df1 = df.withColumn('colA', lit(20.0))
df1 = df1.withColumn('colB', lit(30.0))

Но не уверен насчет динамического процесса, чтобы сделать это из словаря

Ответы [ 2 ]

0 голосов
/ 04 декабря 2018

Есть способы скрыть цикл, но выполнение будет таким же.Например, вы можете использовать select:

from pyspark.sql.functions import lit

df2 = df.select("*", *[lit(val).alias(key) for key, val in x.items()])
df2.show()
#+---+-----+----+----+
#| ID|Value|colB|colA|
#+---+-----+----+----+
#|  1|  ABC|  30|  20|
#|  1|  BCD|  30|  20|
#|  1|  AKB|  30|  20|
#|  2|  CAB|  30|  20|
#|  2|  AIK|  30|  20|
#|  3|  KIB|  30|  20|
#+---+-----+----+----+

или functools.reduce и withColumn:

from functools import reduce
df3 = reduce(lambda df, key: df.withColumn(key, lit(x[key])), x, df)
df3.show()
# Same as above

или pyspark.sql.functions.struct с select() и синтаксис "*" :

from pyspark.sql.functions import struct
df4 = df.withColumn('x', struct([lit(val).alias(key) for key, val in x.items()]))\
    .select("ID", "Value", "x.*")
df4.show()
#Same as above

Но если вы посмотрите на план выполнения этих методов, вы увидите, что они абсолютно одинаковы:

df2.explain()
#== Physical Plan ==
#*Project [ID#44L, Value#45, 30 AS colB#151, 20 AS colA#152]
#+- Scan ExistingRDD[ID#44L,Value#45]

df3.explain()
#== Physical Plan ==
#*Project [ID#44L, Value#45, 30 AS colB#102, 20 AS colA#107]
#+- Scan ExistingRDD[ID#44L,Value#45]

df4.explain()
#== Physical Plan ==
#*Project [ID#44L, Value#45, 30 AS colB#120, 20 AS colA#121]
#+- Scan ExistingRDD[ID#44L,Value#45]

Далее, если вы сравните метод loop в @ anil's answer :

df1 = df  
for key in x:
    df1 = df1.withColumn(key, lit(x[key]))
df1.explain()
#== Physical Plan ==
#*Project [ID#44L, Value#45, 30 AS colB#127, 20 AS colA#132]
#+- Scan ExistingRDD[ID#44L,Value#45]

Вы увидите, что это тоже самое.

0 голосов
/ 04 декабря 2018

Переберите словарь, как показано ниже

df1 = df  
for key in x:
    df1 = df1.withColumn(key, lit(x[key]))
...