Создать новый столбец PySpark SQL - Python - PullRequest
2 голосов
/ 25 апреля 2020

Я пытаюсь создать новый столбец с тремя возможными значениями DEF, FWD, MID.

DEF= ['LB','LWB','RB','LCB','RCB','CB','RWB']
FWD= ['RF','LF','LW','RS','RW','LS','CF','ST'] 
MID= ['LCM','LM','RDM','CAM','RAM','RCM','CM','CDM','RM','LAM','LDM'] 

Я использую набор данных ФИФА о футболистах. Я хочу создать новый столбец, который сообщает нам, является ли игрок DEF, FWD, MID в зависимости от позиции, в которой он играет.

Мой текущий код:

  df_kmeans = dffiltereds.withColumn("Position_Group", when(col('Position')==DEF,'DEF').when(col('Position')==FWD,'FWD').when(col('Position')==MID,'MID'))

Но я продолжаю получать Сообщения об ошибках. Любая помощь приветствуется.

Я получаю очень длинное сообщение об ошибке:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-13-dff2b83d143f> in <module>
      9 #
     10 
---> 11 df_kmeans_new = dffiltereds.withColumn("Position_Group", when(col('Position')==DEF,'DEF').when(col('Position')==FWD,'FWD').when(col('Position')==MID,'MID'))

C:\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\sql\column.py in _(self, other)
    113     def _(self, other):
    114         jc = other._jc if isinstance(other, Column) else other
--> 115         njc = getattr(self._jc, name)(jc)
    116         return Column(njc)
    117     _.__doc__ = doc

C:\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1284         answer = self.gateway_client.send_command(command)
   1285         return_value = get_return_value(
-> 1286             answer, self.gateway_client, self.target_id, self.name)
   1287 
   1288         for temp_arg in temp_args:

C:\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
     96     def deco(*a, **kw):
     97         try:
---> 98             return f(*a, **kw)
     99         except py4j.protocol.Py4JJavaError as e:
    100             converted = convert_exception(e.java_exception)

C:\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o222.equalTo.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [LB, LWB, RB, LCB, RCB, CB, RWB]
    at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:85)
    at org.apache.spark.sql.catalyst.expressions.Literal$.$anonfun$create$2(literals.scala:145)
    at scala.util.Failure.getOrElse(Try.scala:222)
    at org.apache.spark.sql.catalyst.expressions.Literal$.create(literals.scala:145)
    at org.apache.spark.sql.functions$.typedLit(functions.scala:132)
    at org.apache.spark.sql.functions$.lit(functions.scala:115)
    at org.apache.spark.sql.Column.$eq$eq$eq(Column.scala:280)
    at org.apache.spark.sql.Column.equalTo(Column.scala:303)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:832)

1 Ответ

2 голосов
/ 25 апреля 2020

Для spark1.5+ вы можете использовать array_contains. Единственный улов в том, что вам нужно создать array of literals of your three lists и использовать expr для передачи column в value часть array_contains. Ссылка на array_contains (spark docs) .

В этом случае вам не нужно делать проверку для MID или создавать для нее массив, потому что вы можете просто сделать otherwise до negate the others.

#sample data
#df.show()
#+--------+
#|Position|
#+--------+
#|      RF|
#|     RDM|
#|      CB|
#+--------+

from pyspark.sql import functions as F
from pyspark.sql.functions import when

DEF= ['LB','LWB','RB','LCB','RCB','CB','RWB']
FWD= ['RF','LF','LW','RS','RW','LS','CF','ST'] 
MID= ['LCM','LM','RDM','CAM','RAM','RCM','CM','CDM','RM','LAM','LDM'] 

df.withColumn("DEF", F.array(*[F.lit(x) for x in DEF]))\
  .withColumn("FWD", F.array(*[F.lit(x) for x in FWD]))\
  .withColumn("Position_2", F.when(F.expr("""array_contains(DEF,Position)""")==True, F.lit('DEF'))\
                             .when(F.expr("""array_contains(FWD,Position)""")==True, F.lit('FWD'))\
                              .otherwise(F.lit("MID"))).drop("DEF","MID","FWD").show()

#+--------+----------+
#|Position|Position_2|
#+--------+----------+
#|      RF|       FWD|
#|     RDM|       MID|
#|      CB|       DEF|
#+--------+----------+

Для spark2.4+ вы можете использовать arrays_overlap

from pyspark.sql import functions as F
from pyspark.sql.functions import when
df.withColumn("Position_2", F.when(F.arrays_overlap(F.array(*(F.lit(x) for x in DEF)),F.array("Position")),F.lit('DEF'))\
                             .when(F.arrays_overlap(F.array(*(F.lit(x) for x in FWD)),F.array("Position")),F.lit('FWD'))\
                             .otherwise(F.lit('MID'))).show()

I overlooked a rather simple method, используя isin. (Spark1.5 +).

df.withColumn("Position_2", F.when(F.col("Position").isin(DEF),F.lit('DEF'))\
                             .when(F.col("Position").isin(FWD),F.lit('FWD'))\
                             .otherwise(F.lit('MID'))).show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...