Давайте начнем с создания данных:
rdd = sc.parallelize([[1,"x",700],[2,"y",850],[3,"z",560],[4,"a",578],[5,"b",456],[6,"c",678]])
df = rdd.toDF(["SNo","Name","CScore"])
>>> df.show()
+---+----+------+
|SNo|Name|CScore|
+---+----+------+
| 1| x| 700|
| 2| y| 850|
| 3| z| 560|
| 4| a| 578|
| 5| b| 456|
| 6| c| 678|
+---+----+------+
, если вашей конечной целью является предоставление binning_dictionary, как в вашем примере, чтобы найти соответствующее категориальное значение.udf - это решение.
следующая ваша обычная функция:
bin_lookup = {(1,400):'Low',(401,900):'High'}
def binning(value, lookup_dict=bin_lookup):
for key in lookup_dict.keys():
if key[0] <= value <= key[1]:
return lookup_dict[key]
, чтобы зарегистрироваться как udf и запустить его через фрейм данных:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
binning_udf = udf(binning, StringType())
>>> df.withColumn("binnedColumn", binning_udf("CScore")).show()
+---+----+------+------------+
|SNo|Name|CScore|binnedColumn|
+---+----+------+------------+
| 1| x| 700| High|
| 2| y| 850| High|
| 3| z| 560| High|
| 4| a| 578| High|
| 5| b| 456| High|
| 6| c| 678| High|
+---+----+------+------------+
напрямую относится кrdd:
>>> rdd.map(lambda row: binning(row[-1], bin_lookup)).collect()
['High', 'High', 'High', 'High', 'High', 'High']