У меня есть 2 Dataframe, df1 и df2:
df1:
+-------------------+----------+------------+
| df1.name |df1.state | df1.pincode|
+-------------------+----------+------------+
| CYBEX INTERNATION| HOUSTON | 00530 |
| FLUID POWER| MEDWAY | 02053 |
| REFINERY SYSTEMS| FRANCE | 072234 |
| K N ENTERPRISES| MUMBAI | 100010 |
+-------------------+----------+------------+
df2:
+--------------------+------------+------------+
| df2.name |df2.state | df2.pincode|
+--------------------+------------+------------+
|FLUID POWER PVT LTD | MEDWAY | 02053 |
| CYBEX INTERNATION | HOUSTON | 02356 |
|REFINERY SYSTEMS LTD| MUMBAI | 072234 |
+--------------------+------------+------------+
Моя работа заключается в проверке данных в df1 присутствует в df2, если он действительно validate = 1 else validate = 0. Теперь я выполняю некоторую операцию соединения с условием, состоянием и Pincode, и для сравнения строк я сначала преобразовываю строку в нижний регистр, сортируя и используя Python Соответствие последовательности. Ожидаемый вывод:
+-------------------+-------------------+----------+------------+------------+
| df1.name|df2.name |df1.state | df1.pincode| Validated |
+-------------------+-------------------+----------+------------+------------+
| CYBEX INTERNATION| NULL |HOUSTON | 00530 | 0 |
| FLUID POWER|FLUID POWER PVT LTD|MEDWAY | 02053 | 1 |
| REFINERY SYSTEMS| NULL |FRANCE | 072234 | 0 |
| K N ENTERPRISES| NULL |MUMBAI | 100010 | 0 |
+-------------------+-------------------+----------+------------+------------+
У меня есть мой код:
from pyspark.sql.types import *
from difflib import SequenceMatcher
from pyspark.sql.functions import col,when,lit,udf
contains = udf(lambda s, q: SequenceMatcher(None,"".join(sorted(s.lower())), "".join(sorted(q.lower()))).ratio()>=0.9, BooleanType())
join_condition = ((col("df1.pincode") == col("df2.pincode")) & (col("df1.state") == col("df2.state")))
result_df = df1.alias("df1").join(df2.alias("df2"), join_condition , "left").where(contains(col("df1.name"), col("df2.name")))
result = result_df.select("df1.*",when(col("df2.name").isNotNull(), lit(1)).otherwise(lit(0)).alias("validated"))
result.show()
Но вывод дает мне AttributeError: У объекта 'NoneType' нет атрибута 'lower' Я знаю, что столбец с несопоставленными значениями равен Null, поэтому s.lower () и p.lower () не работают, но как решить эту проблему. Я хочу, чтобы только это условие содержалось, чтобы выполнить процесс фильтрации.
Кроме того, мне нужно иметь столбец df2.name в результате, для которого я даю имена столбцов в списке:
cols = ["df1.name","df2.name","df1.state","df1.pincode"]
result = result_df.select(*cols,when(col("df2.name").isNotNull(), lit(1)).otherwise(lit(0)).alias("validated"))
Но снова я получаю сообщение об ошибке: SyntaxError: только именованные аргументы могут следовать * выражение
Любая помощь будет оценена. Спасибо.