Как исправить приведенную ниже проблему при создании производного столбца в pyspark? - PullRequest
0 голосов
/ 22 апреля 2019

Я пытаюсь выполнить группирование для определенного столбца в кадре данных на основе данных, указанных в словаре.

Ниже приведен кадр данных, который я использовал:

df

SNo,Name,CScore
1,x,700
2,y,850
3,z,560
4,a,578
5,b,456
6,c,678

Я создал указанную ниже функцию, она работает нормально, если я использую ее отдельно.


def binning(column,dict):
    finalColumn=[]
    lent = len(column)
    for i in range(lent):
        for j in range(len(list(dict))):
            if( int(column[i]) in range(list(dict)[j][0],list(dict)[j][1])):
                finalColumn.append(dict[list(dict)[j]])
    return finalColumn

Я использовал вышеуказанную функцию в приведенном ниже утверждении.

newDf = df.withColumn("binnedColumn",binning(df.select("CScore").rdd.flatMap(lambda x: x).collect(),{(1,400):'Low',(401,900):'High'}))

Iполучаю следующее сообщение об ошибке:

Traceback (последний вызов был последним): файл "", строка 1, в файле "C: \ spark_2.4 \ python \ pyspark \ sql \ dataframe.py", строка 1988, в withColumn assert isinstance (col, Column), «col должно быть Column» AssertionError: col должно быть Column

Любая помощь для решения этой проблемы будет очень полезна. Спасибо.

1 Ответ

0 голосов
/ 23 апреля 2019

Давайте начнем с создания данных:

rdd = sc.parallelize([[1,"x",700],[2,"y",850],[3,"z",560],[4,"a",578],[5,"b",456],[6,"c",678]])
df = rdd.toDF(["SNo","Name","CScore"])
>>> df.show()
+---+----+------+
|SNo|Name|CScore|
+---+----+------+
|  1|   x|   700|
|  2|   y|   850|
|  3|   z|   560|
|  4|   a|   578|
|  5|   b|   456|
|  6|   c|   678|
+---+----+------+

, если вашей конечной целью является предоставление binning_dictionary, как в вашем примере, чтобы найти соответствующее категориальное значение.udf - это решение.

следующая ваша обычная функция:

bin_lookup = {(1,400):'Low',(401,900):'High'}
def binning(value, lookup_dict=bin_lookup):
    for key in lookup_dict.keys():
        if key[0] <= value <= key[1]:
             return lookup_dict[key]

, чтобы зарегистрироваться как udf и запустить его через фрейм данных:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

binning_udf = udf(binning, StringType())
>>> df.withColumn("binnedColumn", binning_udf("CScore")).show()
+---+----+------+------------+
|SNo|Name|CScore|binnedColumn|
+---+----+------+------------+
|  1|   x|   700|        High|
|  2|   y|   850|        High|
|  3|   z|   560|        High|
|  4|   a|   578|        High|
|  5|   b|   456|        High|
|  6|   c|   678|        High|
+---+----+------+------------+

напрямую относится кrdd:

>>> rdd.map(lambda row: binning(row[-1], bin_lookup)).collect()
['High', 'High', 'High', 'High', 'High', 'High']

...