Проверьте данные строки в одном pyspark Dataframe, сопоставленном в другом Dataframe - PullRequest
0 голосов
/ 17 февраля 2020

У меня есть 2 Pyspark Dataframe df1, df2. И df1, и df2 содержат миллионы записей.

df1 похож на:

+-------------------+--------+--------+
|               name|state   | pincode|
+-------------------+--------+--------+
|  CYBEX INTERNATION| HOUSTON| 00530  |
|        FLUID POWER| MEDWAY | 02053  |
|   REFINERY SYSTEMS| FRANCE | 072234 |
|    K N ENTERPRISES| MUMBAI | 100010 |
+-------------------+--------+--------+

df2 похож на:

+--------------------+--------+--------+
|               name |state   | pincode|
+--------------------+--------+--------+
|FLUID POWER PVT LTD | MEDWAY | 02053  |
|  CYBEX INTERNATION | HOUSTON| 02356  |
|REFINERY SYSTEMS LTD| MUMBAI | 072234 |
+--------------------+--------+--------+

Итак, я хочу проверить, является ли df1 найден в df2 или не основан на состоянии имени и Пинкоде, и вывод должен быть подтвержден, т. е. найденная строка будет 1, иначе 0 и df будет

+-------------------+--------+--------+--------- --+
|               name|state   | pincode|  Validated |
+-------------------+--------+--------+---------- -+
|  CYBEX INTERNATION| HOUSTON| 00530  |     0      |
|        FLUID POWER| MEDWAY | 02053  |     1      |
|   REFINERY SYSTEMS| FRANCE | 072234 |     0      |
|    K N ENTERPRISES| MUMBAI | 100010 |     0      |
+-------------------+--------+--------+------------+

В 1-м случае строки 1 из df1 Пинкод не ' t совпадает с любым столбцом PIN-кода df2, таким образом, проверено = 0
Во втором случае строки 2 PIN-кода df1 совпало, состояние также совпадает, и для столбца имени я использую Левенштейна для сопоставления имени столбца, и окончательная строка проверяется = 1
В 3-м ряду Пинкод совпадает, но состояние не совпадает и проверяется = 0
В 4-м Пинкоде нет и проверяется = 0

Я пытался сделать это с Pandas dataFrame во время итерации данных во вложенном if, но данные настолько огромны, что итерация не является хорошим выбором.

Я ожидаю ускорить процесс, используя pyspark и используя параллельную обработку, что-то вроде:

df_final = df1.withColumn('validated', if some_expression == True THEN 1,ELSE 0)

Но не аб чтобы выяснить some_expression и как проверить всю проверку df1 на другом df2 с заданными столбцами и без какой-либо итерации.

Я прошел через различные вопросы о свечах и похожие проблемы, но ни одна из них мне не помогла. Любая помощь будет оценена. Пожалуйста, прокомментируйте, если какая-либо информация не ясна.

1 Ответ

1 голос
/ 17 февраля 2020

Используя levenshtein-distance с левым соединением, вы можете сделать что-то вроде этого:

join_condition = (col("df1.pincode") == col("df2.pincode")) \
                 & (levenshtein(col("df1.name"), col("df2.name")) <= 10) \
                 & (col("df1.state") == col("df2.state"))

result_df = df1.alias("df1").join(df2.alias("df2"), join_condition , "left")

result_df.select("df1.*",
              when(col("df2.name").isNotNull(), lit(1)).otherwise(lit(0)).alias("validated")
              ).show()

#+-----------------+-------+-------+---------+
#|             name|  state|pincode|validated|
#+-----------------+-------+-------+---------+
#|CYBEX INTERNATION|HOUSTON|  00530|        0|
#|      FLUID POWER| MEDWAY|  02053|        1|
#| REFINERY SYSTEMS| FRANCE| 072234|        0|
#|  K N ENTERPRISES| MUMBAI| 100010|        0|
#+-----------------+-------+-------+---------+
...