Pyspark regexp_replace с элементами списка не заменяет строку - PullRequest
0 голосов
/ 03 мая 2018

Я пытаюсь заменить строку в столбце данных, используя regexp_replace. Я должен применить шаблоны регулярных выражений ко всем записям в столбце данных. Но строки не заменяют, как ожидалось.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark import sql
from  pyspark.sql.functions import regexp_replace,col
import re

conf = SparkConf().setAppName("myFirstApp").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = sql.SQLContext(sc)


df=sc.parallelize([('2345','ADVANCED by John'),
('2398','ADVANCED by ADVANCE'),
('2328','Verified by somerandomtext'),
('3983','Double Checked by Marsha')]).toDF(['ID', "Notes"])

reg_patterns=["ADVANCED|ADVANCE/ADV/","ASSOCS|AS|ASSOCIATES/ASSOC/"]

for i in range(len(reg_patterns)):
        res_split=re.findall(r"[^/]+",reg_patterns[i])
        res_split[0]
        df=df.withColumn('NotesUPD',regexp_replace(col('Notes'),res_split[0],res_split[1]))

df.show()

Выход:

+----+--------------------+--------------------+
|  ID|               Notes|            NotesUPD|
+----+--------------------+--------------------+
|2345|    ADVANCED by John|    ADVANCED by John|
|2398| ADVANCED by ADVANCE| ADVANCED by ADVANCE|
|2328|Verified by somer...|Verified by somer...|
|3983|Double Checked by...|Double Checked by...|
+----+--------------------+--------------------+

Expected Output:

+----+--------------------+--------------------+
|  ID|               Notes|            NotesUPD|
+----+--------------------+--------------------+
|2345|    ADVANCED by John|    ADV by John|
|2398| ADVANCED by ADVANCE|    ADV by ADV |
|2328|Verified by somer...|Verified by somer...|
|3983|Double Checked by...|Double Checked by...|
+----+--------------------+--------------------+

Ответы [ 3 ]

0 голосов
/ 03 мая 2018

Проблема в том, что ваш код многократно перезаписывает предыдущие результаты, начиная с начала. Вместо этого вы должны опираться на предыдущие результаты:

notes_upd = col('Notes')

for i in range(len(reg_patterns)):
    res_split=re.findall(r"[^/]+",reg_patterns[i])
    res_split[0]
    notes_upd = regexp_replace(notes_upd, res_split[0],res_split[1])

и вы получите желаемый результат:

df.withColumn('NotesUPD', notes_upd).show()

# +----+--------------------+--------------------+
# |  ID|               Notes|            NotesUPD|
# +----+--------------------+--------------------+
# |2345|    ADVANCED by John|         ADV by John|
# |2398| ADVANCED by ADVANCE|          ADV by ADV|
# |2328|Verified by somer...|Verified by somer...|
# |3983|Double Checked by...|Double Checked by...|
# +----+--------------------+--------------------+
0 голосов
/ 15 августа 2018

Предыдущие решения ограничены короткой длиной reg_patterns. Приведенная ниже реализация хорошо масштабируется, когда шаблон нормализации имеет много записей (например, исправление орфографии с использованием пользовательского словаря).

Начните с отображения списка reg_patterns в словарь:

from pyspark.sql.functions import col, udf

def parse_string(s, or_delim, target_delim):
    keys,value = s.rstrip(target_delim).rsplit(target_delim)
    return {key:value for key in keys.split(or_delim)}

reg_patterns=["ADVANCED|ADVANCE/ADV/","ASSOCS|AS|ASSOCIATES/ASSOC/"]

normalization_dict = {}
for item in reg_patterns:
    normalization_dict.update(parse_string(item, "|", "/"))

Завершите нормализацию столбца DataFrame «Примечания» с помощью функции карри следующим образом:

def my_norm_func(s, ngram_dict, pattern):
    return pattern.sub(lambda x: ngram_dict[x.group()], s)

norm_pattern = re.compile(r'\b(' + '|'.join([re.escape(item)\
                      for item in normalization_dict.keys()]) + r')\b')
my_norm_udf = udf(lambda s: my_norm_func(s, normalization_dict, norm_pattern))
df = df.withColumn("NotesUPD", my_norm_udf(col("Notes")))

дает следующий желаемый результат:

+----+--------------------+--------------------+
|  ID|               Notes|            NotesUPD|
+----+--------------------+--------------------+
|2345|    ADVANCED by John|         ADV by John|
|2398| ADVANCED by ADVANCE|          ADV by ADV|
|2328|Verified by somer...|Verified by somer...|
|3983|Double Checked by...|Double Checked by...|
+----+--------------------+--------------------+
0 голосов
/ 03 мая 2018

Вы должны написать udf функцию и цикл в вашем reg_patterns, как показано ниже

reg_patterns=["ADVANCED|ADVANCE/ADV/","ASSOCS|AS|ASSOCIATES/ASSOC/"]

import re
from pyspark.sql import functions as f
from pyspark.sql import types as t
def replaceUdf(column):
    res_split=[]
    for i in range(len(reg_patterns)):
        res_split=re.findall(r"[^/]+",reg_patterns[i])
        for x in res_split[0].split("|"):
            column = column.replace(x,res_split[1])
    return column

reg_replaceUdf = f.udf(replaceUdf, t.StringType())

df = df.withColumn('NotesUPD', reg_replaceUdf(f.col('Notes')))
df.show()

и у вас должно быть

+----+--------------------+--------------------+
|  ID|               Notes|            NotesUPD|
+----+--------------------+--------------------+
|2345|    ADVANCED by John|         ADV by John|
|2398| ADVANCED by ADVANCE|          ADV by ADV|
|2328|Verified by somer...|Verified by somer...|
|3983|Double Checked by...|Double Checked by...|
+----+--------------------+--------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...