У меня есть требование генерировать выражение Select (с coalesce fucntion и UDF-checkNullUdf) динамически на основе нескольких условий. для которого я написал код, как показано ниже:
column_expr=''
for row in PRIORITIZATION.rdd.collect():
x=row.__fields__
colName=row.Element
i=0
lst=['']
for col in row:
if (col is not None and i != 0 and i != 1) :
dfColName="checkNullUdf("+x[i]+"[\""+colName+"\"])"
lst.insert(int(col)-1,dfColName)
i+=1
lst.remove('')
print(lst)
lst=",".join(lst)
column_expr=column_expr+"coalesce("+lst+"),"
column_expr=column_expr[:-1]
print("Final String is: " +column_expr)
, который дает следующий вывод, как и ожидалось
Final String is: coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))
Но при передаче этой строки в выражение выбора, как показано ниже, я получаю ошибка:
RESULT_REC = GOLDEN_RECORD.join(BITPULSE, GOLDEN_RECORD.BitPulse_rec_Id==BITPULSE.UUID, "left_outer").join(SAP, GOLDEN_RECORD.SAP_rec_Id==SAP.UUID, "left_outer").join(WEBBIT, GOLDEN_RECORD.MDM_Well_Id==WEBBIT.UUID, "left_outer").join(WELLDB, GOLDEN_RECORD.WellDB_rec_Id==WELLDB.UUID, "left_outer").select(column_expr)
Ошибка указана ниже:
Py4JJavaError Traceback (most recent call last)
C:\spark\spark-2.4.5-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
C:\spark\spark-2.4.5-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
Py4JJavaError: An error occurred while calling o75.select.
: org.apache.spark.sql.AnalysisException: cannot resolve '`coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))`' given input columns: [State, Country, UUID, County, City, WellDB_rec_Id, Spud_Date, UUID, Well_Name, Water_Depth, State, Spud_Date, UUID, SAP_rec_Id, Country, Plug_Date, County, State, UUID, Spud_Date, Field_Name, BitPulse_rec_Id, Operator, Well_Name, Water_Depth, Country, MDM_Well_Id, UUID, Water_Depth, Field_Name, Well_Name, Long];;
'Project ['coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))]
+- Join LeftOuter, (WellDB_rec_Id#14 = UUID#124)
:- Join LeftOuter, (MDM_Well_Id#11 = UUID#108)
: :- Join LeftOuter, (SAP_rec_Id#12 = UUID#88)
: : :- Join LeftOuter, (BitPulse_rec_Id#13 = UUID#40)
: : : :- Relation[UUID#10,MDM_Well_Id#11,SAP_rec_Id#12,BitPulse_rec_Id#13,WellDB_rec_Id#14,Well_Name#15,Country#16,Operator#17,Field_Name#18,Long#19] csv
: : : +- Relation[UUID#40,Spud_Date#41,Plug_Date#42,Water_Depth#43,Field_Name#44,State#45,County#46] csv
: : +- Relation[UUID#88,Well_Name#89,Country#90,City#91,State#92] csv
: +- Relation[UUID#108,Spud_Date#109,Water_Depth#110] csv
+- Relation[UUID#124,Well_Name#125,State#126,County#127,Country#128,Spud_Date#129,Water_Depth#130] csv
--------------------------------------------------------------------
AnalysisException: 'cannot resolve \'`coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))`\' given input columns: [State, Country, UUID, County, City, WellDB_rec_Id, Spud_Date, UUID, Well_Name, Water_Depth, State, Spud_Date, UUID, SAP_rec_Id, Country, Plug_Date, County, State, UUID, Spud_Date, Field_Name, BitPulse_rec_Id, Operator, Well_Name, Water_Depth, Country, MDM_Well_Id, UUID, Water_Depth, Field_Name, Well_Name, Long];;\n\'Project [\'coalesce(checkNullUdf(GOLDEN_RECORD["Well_Name"]),checkNullUdf(SAP["Well_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Country"])),coalesce(checkNullUdf(SAP["City"])),coalesce(checkNullUdf(SAP["State"])),coalesce(checkNullUdf(BITPULSE["Spud_Date"])),coalesce(checkNullUdf(BITPULSE["Plug_Date"])),coalesce(checkNullUdf(WELLDB["Water_Depth"]),checkNullUdf(WEBBIT["Water_Depth"]),checkNullUdf(BITPULSE["Water_Depth"])),coalesce(checkNullUdf(WELLDB["County"])),coalesce(checkNullUdf(GOLDEN_RECORD["Operator"])),coalesce(checkNullUdf(GOLDEN_RECORD["Field_Name"])),coalesce(checkNullUdf(GOLDEN_RECORD["Long"]))]\n+- Join LeftOuter, (WellDB_rec_Id#14 = UUID#124)\n :- Join LeftOuter, (MDM_Well_Id#11 = UUID#108)\n : :- Join LeftOuter, (SAP_rec_Id#12 = UUID#88)\n : : :- Join LeftOuter, (BitPulse_rec_Id#13 = UUID#40)\n : : : :- Relation[UUID#10,MDM_Well_Id#11,SAP_rec_Id#12,BitPulse_rec_Id#13,WellDB_rec_Id#14,Well_Name#15,Country#16,Operator#17,Field_Name#18,Long#19] csv\n : : : +- Relation[UUID#40,Spud_Date#41,Plug_Date#42,Water_Depth#43,Field_Name#44,State#45,County#46] csv\n : : +- Relation[UUID#88,Well_Name#89,Country#90,City#91,State#92] csv\n : +- Relation[UUID#108,Spud_Date#109,Water_Depth#110] csv\n +- Relation[UUID#124,Well_Name#125,State#126,County#127,Country#128,Spud_Date#129,Water_Depth#130] csv\n'
20/04/07 00:53:33 INFO SparkContext: Invoking stop() from shutdown hook
, но если я скопирую вставку вывода Final String в выражение select, он работает нормально, и я получаю ожидаемый результат.
Не уверен, почему я получаю ошибку при передаче column_expr