Сравните значение столбца со списком из 3 массивов и замените значение столбца именем массива - PullRequest
0 голосов
/ 27 мая 2020
from pyspark.sql.functions import when,col
from pyspark.sql.functions import udf
#Your code here to create a new variable df_kmeans_new with a new column Position_Group,..
from pyspark.sql.types import *

#Your code to complete
DEF= ["LB","LWB","RB","LCB","RCB","CB","RWB"]
FWD=  ["RF","LF","LW","RS","RW","LS","CF","ST"]
MID=  ["LCM","LM","RDM","CAM","RAM","RCM","CM","CDM","RM","LAM","LDM"]

df  = spark.createDataFrame(
    [(1, "LB", "4"), 
      (2, "LM", "0"), 
       (3, "LCB", "4"), 
        (4, "RS", "4")],
          ("id", "Position", "Position_x"))

def check_in_def(cell_val):
    if cell_val in DEF:
      return "DEF"
    elif cell_val in FWD:
      return "FWD"
    elif cell_val in MID:
      return "MID"
    else:
      return "NA"

df = df.withColumn("Position_Group",when(check_in_def(df.Position)=="DEF","DEF").when(check_in_def(df.Position)=="FWD","FWD").otherwise(0)).show()

Я хочу создать новый столбец в df, который будет содержать одно из трех имен массива: DEF, FWD и MID, если значение столбца Position найдено в конкретном массиве.

но код не работает ... пожалуйста, помогите!

Ответы [ 2 ]

1 голос
/ 27 мая 2020

Вместо функции вы также можете создать словарь, а затем перевернуть его, а затем сопоставить словарь с новым столбцом, используя create_map:

from itertools import chain
import pyspark.sql.functions as F

d = {"DEF":DEF,"FWD":FWD,"MID":MID}
d1 = {i:k for k,v in d.items() for i in v}

mapping = F.create_map([F.lit(x) for x in chain(*d1.items())])
df.withColumn("Position_Group",mapping[df['Position']]).show()

+---+--------+----------+--------------+
| id|Position|Position_x|Position_Group|
+---+--------+----------+--------------+
|  1|      LB|         4|           DEF|
|  2|      LM|         0|           MID|
|  3|     LCB|         4|           DEF|
|  4|      RS|         4|           FWD|
+---+--------+----------+--------------+
0 голосов
/ 27 мая 2020

Ваша функция не работает с withColumn, потому что она передает весь столбец вместо одного значения. С векторизацией вашего кода вместо написания кучи операторов if:

from pyspark.sql.functions import when,col
from pyspark.sql.functions import udf
#Your code here to create a new variable df_kmeans_new with a new column Position_Group,..
from pyspark.sql.types import *

#Your code to complete
dict = {
'DEF' : ["LB","LWB","RB","LCB","RCB","CB","RWB"]
,'FWD' :  ["RF","LF","LW","RS","RW","LS","CF","ST"]
,'MID' :  ["LCM","LM","RDM","CAM","RAM","RCM","CM","CDM","RM","LAM","LDM"]
}

df_map_list = []
for key, value in dict.items():
  for v in value:
    df_map_list.append((key, v))

df_map  = spark.createDataFrame(df_map_list, ('key', 'Position'))

df  = spark.createDataFrame(
    [(1, "LB", "4"), 
      (2, "LM", "0"), 
       (3, "LCB", "4"), 
        (4, "RS", "4")],
          ("id", "Position", "Position_x"))

df = df.alias('a').join(df_map.alias('b'), col('a.Position') == col('b.Position'), 'left').select(['a.*'] + [col('b.key').alias('Position_Group')])

df.show() 
...