Побитовые операции в pyspark без использования udf - PullRequest
0 голосов
/ 07 августа 2020

У меня есть фрейм данных Spark, как показано ниже:

+---------+---------------------------+
|country  |sports                     |
+---------+---------------------------+
|India    |[Cricket, Hockey, Football]|
|Sri Lanka|[Cricket, Football]        |
+---------+---------------------------+

Каждый вид спорта в столбце Sports представлен кодом:

sport_to_code_map = {
'Cricket' : 0x0001,
'Hockey' : 0x0002,
'Football' : 0x0004
}

Теперь я хочу добавить новый столбец с именем sportsInt, который является результатом поразрядного или каждого кода, связанного со спортивной строкой на приведенной выше карте, что приводит к:

+---------+---------------------------+---------+
|country  |sports                     |sportsInt|
+---------+---------------------------+---------+
|India    |[Cricket, Hockey, Football]|7        |
|Sri Lanka|[Cricket, Football]        |5        |
+---------+---------------------------+---------+

Я знаю один из способов сделать это - использовать UDF, и это будет примерно так:

def get_sport_to_code(sport_name):

    sport_to_code_map = {
        'Cricket': 0x0001,
        'Hockey': 0x0002,
        'Football': 0x0004
    }

    if feature not in sport_to_code_map:
        raise Exception(f'Unknown Sport: {sport_name}')
    return sport_to_code_map.get(sport_name)

def sport_to_code(sports):
    if not sports:
        return None

    code = 0x0000
    for sport in sports:
        code = code | get_sport_to_code(sport)
    return code
import pyspark.sql.functions as F

sport_to_code_udf = F.udf(sport_to_code, F.StringType())
df.withColumn('sportsInt',sport_to_code_udf('sports'))

Но есть ли способ сделать это с помощью искровых функций? а не udf?

1 Ответ

0 голосов
/ 07 августа 2020

From Spark-2.4+ мы можем использовать aggregate более высокий порядок функцию с оператором bitwise or для этого случая.

Example:

from pyspark.sql.types import *
from pyspark.sql.functions import *

sport_to_code_map = {
'Cricket' : 0x0001,
'Hockey' : 0x0002,
'Football' : 0x0004
}

#creating dataframe from dictionary
lookup=spark.createDataFrame(*[zip(sport_to_code_map.keys(),sport_to_code_map.values())],["key","value"])

#sample dataframe
df.show(10,False)
#+---------+---------------------------+
#|country  |sports                     |
#+---------+---------------------------+
#|India    |[Cricket, Hockey, Football]|
#|Sri Lanka|[Cricket, Football]        |
#+---------+---------------------------+

df1=df.selectExpr("explode(sports) as key","country")

df2=df1.join(lookup,['key'],'left').\
groupBy("country").\
agg(collect_list(col("key")).alias("sports"),collect_list(col("value")).alias("sportsInt"))

df2.withColumn("sportsInt",expr('aggregate(sportsInt,0,(s,x) -> int(s) | int(x))')).\
show(10,False)
#+---------+---------------------------+---------+
#|country  |sports                     |sportsInt|
#+---------+---------------------------+---------+
#|Sri Lanka|[Cricket, Football]        |5        |
#|India    |[Cricket, Hockey, Football]|7        |
#+---------+---------------------------+---------+

Если вы хотите избежать соединения для поиска в sport_to_code_map dict, используйте .replace:

#converting dict values to string
sport_to_code_map={k:str(v) for k,v in sport_to_code_map.items()}

df1.replace(sport_to_code_map).show()
#+---+---------+
#|key|  country|
#+---+---------+
#|  1|    India|
#|  2|    India|
#|  4|    India|
#|  1|Sri Lanka|
#|  4|Sri Lanka|
#+---+---------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...