Udf не работает - PullRequest
       14

Udf не работает

0 голосов
/ 20 мая 2018

Можете ли вы помочь мне оптимизировать этот код и заставить его работать?это исходные данные:

+--------------------+-------------+
|       original_name|medicine_name|
+--------------------+-------------+
|         Venlafaxine|  Venlafaxine|
|    Lacrifilm 5mg/ml|    Lacrifilm|
|    Lacrifilm 5mg/ml|         null|
|         Venlafaxine|         null|
|Vitamin D10,000IU...|         null|
|         paracetamol|         null|
|            mucolite|         null|

Я ожидаю получить такие данные

+--------------------+-------------+
|       original_name|medicine_name|
+--------------------+-------------+
|         Venlafaxine|  Venlafaxine|
|    Lacrifilm 5mg/ml|    Lacrifilm|
|    Lacrifilm 5mg/ml|    Lacrifilm|
|         Venlafaxine|  Venlafaxine|
|Vitamin D10,000IU...|         null|
|         paracetamol|         null|
|            mucolite|         null|

Это код:

distinct_df = spark.sql("select distinct medicine_name as medicine_name from medicine where medicine_name is not null")
distinct_df.createOrReplaceTempView("distinctDF")

def getMax(num1, num2):
    pmax = (num1>=num2)*num1+(num2>num1)*num2
    return pmax

def editDistance(s1, s2):
    ed = (getMax(length(s1), length(s2)) - levenshtein(s1,s2))/
          getMax(length(s1), length(s2))
    return ed

editDistanceUdf = udf(lambda x,y: editDistance(x,y), FloatType())

def getSimilarity(str):
    res = spark.sql("select medicine_name, editDistanceUdf('str', medicine_name) from distinctDf where editDistanceUdf('str', medicine_name)>=0.85 order by 2")
    res['medicine_name'].take(1)
    return res

getSimilarityUdf = udf(lambda x: getSimilarity(x), StringType())
res_df = df.withColumn('m_name', when((df.medicine_name.isNull)|(df.medicine_name.=="null")),getSimilarityUdf(df.original_name)
.otherwise(df.medicine_name)).show()

теперь яполучая ошибку:

command_part = REFERENCE_TYPE + parameter._get_object_id () AttributeError: у объекта 'function' нет атрибута '_get_object_id'

1 Ответ

0 голосов
/ 21 мая 2018

Существует множество проблем с вашим кодом:

  • Вы не можете использовать SparkSession или распределенные объекты в udf.Так что getSimilarity просто не может работать.Если вы хотите сравнить подобные объекты, вы должны join.
  • Если length и levenshtein взяты из pyspark.sql.functions, их нельзя использовать внутри UserDefinedFunctions.Они предназначены для генерации SQL-выражений с отображением от *Column до Column.
  • Столбец isNull - это метод не property, поэтому его следует вызывать:

    df.medicine_name.isNull()
    
  • После

    df.medicine_name.=="null"
    

    не является синтаксически допустимым Python (выглядит как Scala Calque) и будет генерировать исключения компилятора.

  • If SparkSession доступ был разрешен в UserDefinedFunction это не будет допустимой заменой

    spark.sql("select medicine_name, editDistanceUdf('str', medicine_name) from distinctDf where editDistanceUdf('str', medicine_name)>=0.85 order by 2")
    

    Вы должны использовать методы форматирования строки

    spark.sql("select medicine_name, editDistanceUdf({str}, medicine_name) from distinctDf where editDistanceUdf({str}, medicine_name)>=0.85 order by 2".format(str=str))
    
  • Возможно, некоторыедругие проблемы, но так как вы не предоставили MCVE , все остальное было бы чисто догадкой.

Когда вы исправляете мелкие ошибки, у вас есть два варианта:

...