org. apache .spark. sql .AnalysisException: не удается разрешить UDF (df ["columnName"]) - PullRequest
0 голосов
/ 06 апреля 2020

У меня есть требование генерировать выражение Select (с coalesce fucntion и UDF-checkNullUdf) динамически на основе нескольких условий. для которого я написал код, как показано ниже:

column_expr=''
for row in PRIORITIZATION.rdd.collect():
    x=row.__fields__
    colName=row.Element
    i=0
    lst=['']
    for col in row:
        if (col is not None and i != 0 and i != 1) :
            dfColName="checkNullUdf("+x[i]+"[\""+colName+"\"])"
            lst.insert(int(col)-1,dfColName)
        i+=1
    lst.remove('')
    print(lst)
    lst=",".join(lst)
    column_expr=column_expr+"coalesce("+lst+"),"
column_expr=column_expr[:-1]
print("Final String is: " +column_expr) 

, который дает следующий вывод, как и ожидалось

Final String is: coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))

Но при передаче этой строки в выражение выбора, как показано ниже, я получаю ошибка:

RESULT_REC = GOLDEN_RECORD.join(BITPULSE, GOLDEN_RECORD.BitPulse_rec_Id==BITPULSE.UUID, "left_outer").join(SAP, GOLDEN_RECORD.SAP_rec_Id==SAP.UUID, "left_outer").join(WEBBIT, GOLDEN_RECORD.MDM_Well_Id==WEBBIT.UUID, "left_outer").join(WELLDB, GOLDEN_RECORD.WellDB_rec_Id==WELLDB.UUID, "left_outer").select(column_expr)

Ошибка указана ниже:

Py4JJavaError                             Traceback (most recent call last)
C:\spark\spark-2.4.5-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

C:\spark\spark-2.4.5-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o75.select.
: org.apache.spark.sql.AnalysisException: cannot resolve '`coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))`' given input columns: [State, Country, UUID, County, City, WellDB_rec_Id, Spud_Date, UUID, Well_Name, Water_Depth, State, Spud_Date, UUID, SAP_rec_Id, Country, Plug_Date, County, State, UUID, Spud_Date, Field_Name, BitPulse_rec_Id, Operator, Well_Name, Water_Depth, Country, MDM_Well_Id, UUID, Water_Depth, Field_Name, Well_Name, Long];;
'Project ['coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))]
+- Join LeftOuter, (WellDB_rec_Id#14 = UUID#124)
   :- Join LeftOuter, (MDM_Well_Id#11 = UUID#108)
   :  :- Join LeftOuter, (SAP_rec_Id#12 = UUID#88)
   :  :  :- Join LeftOuter, (BitPulse_rec_Id#13 = UUID#40)
   :  :  :  :- Relation[UUID#10,MDM_Well_Id#11,SAP_rec_Id#12,BitPulse_rec_Id#13,WellDB_rec_Id#14,Well_Name#15,Country#16,Operator#17,Field_Name#18,Long#19] csv
   :  :  :  +- Relation[UUID#40,Spud_Date#41,Plug_Date#42,Water_Depth#43,Field_Name#44,State#45,County#46] csv
   :  :  +- Relation[UUID#88,Well_Name#89,Country#90,City#91,State#92] csv
   :  +- Relation[UUID#108,Spud_Date#109,Water_Depth#110] csv
   +- Relation[UUID#124,Well_Name#125,State#126,County#127,Country#128,Spud_Date#129,Water_Depth#130] csv
--------------------------------------------------------------------
AnalysisException: 'cannot resolve \'`coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))`\' given input columns: [State, Country, UUID, County, City, WellDB_rec_Id, Spud_Date, UUID, Well_Name, Water_Depth, State, Spud_Date, UUID, SAP_rec_Id, Country, Plug_Date, County, State, UUID, Spud_Date, Field_Name, BitPulse_rec_Id, Operator, Well_Name, Water_Depth, Country, MDM_Well_Id, UUID, Water_Depth, Field_Name, Well_Name, Long];;\n\'Project [\'coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))]\n+- Join LeftOuter, (WellDB_rec_Id#14 = UUID#124)\n   :- Join LeftOuter, (MDM_Well_Id#11 = UUID#108)\n   :  :- Join LeftOuter, (SAP_rec_Id#12 = UUID#88)\n   :  :  :- Join LeftOuter, (BitPulse_rec_Id#13 = UUID#40)\n   :  :  :  :- Relation[UUID#10,MDM_Well_Id#11,SAP_rec_Id#12,BitPulse_rec_Id#13,WellDB_rec_Id#14,Well_Name#15,Country#16,Operator#17,Field_Name#18,Long#19] csv\n   :  :  :  +- Relation[UUID#40,Spud_Date#41,Plug_Date#42,Water_Depth#43,Field_Name#44,State#45,County#46] csv\n   :  :  +- Relation[UUID#88,Well_Name#89,Country#90,City#91,State#92] csv\n   :  +- Relation[UUID#108,Spud_Date#109,Water_Depth#110] csv\n   +- Relation[UUID#124,Well_Name#125,State#126,County#127,Country#128,Spud_Date#129,Water_Depth#130] csv\n'
20/04/07 00:53:33 INFO SparkContext: Invoking stop() from shutdown hook

, но если я скопирую вставку вывода Final String в выражение select, он работает нормально, и я получаю ожидаемый результат.

Не уверен, почему я получаю ошибку при передаче column_expr

Ответы [ 2 ]

0 голосов
/ 07 апреля 2020

Вы можете запустить выражение String SQL, используя spar.sql(" SQL expression as a String ")

val employees = spark.createDataFrame(Seq(("E1",100.0,"a,b"), ("E2",200.0,"e,f"),(null,300.0,"c,d"))).toDF("employee","salary","clubs")

employees.createTempView("employees")
spark.sql("select coalesce(employee,salary) as emp_or_sal from employees").show()

Результат -

+----------+
|emp_or_sal|
+----------+
|        E1|
|        E2|
|     300.0|
+----------+
0 голосов
/ 06 апреля 2020

Используйте selectExpr вместо select. Если вы посмотрите на ошибку, вы увидите обратные пометки вокруг всего выражения, что означает, что Spark пытается найти столбец, имя которого соответствует этой строке ВСЕ. selectExpr будет оценивать каждый передаваемый вами параметр как выражение SQL, приводящее к одному столбцу, тогда как select ожидает, что каждый параметр будет именованным столбцом.

Также примечательно:

Трудно отследить ваше намерение с помощью опубликованного кода, но здесь есть некоторые подозрительные вещи. Самый большой, что выпрыгивает на меня, это checkNullUdf. Пользовательские функции будут иметь значительно худшую производительность, чем встроенные функции. Вы также, похоже, делаете collect для получения имен столбцов, которые, если нет ничего особенного в их значениях, не нужны, потому что вы можете просто использовать DataFrame.columns.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...