Мультиобработка строк в Pandas для замены на REGEX - PullRequest
1 голос
/ 01 февраля 2020

Я имею дело с большими таблицами данных, 100M + строк. На некотором количестве столбцов мне нужно выполнить регулярное выражение для замены ряда терминов. Я предварительно скомпилировал и сохранил все термины в словаре для их использования. Пользователь выбирает столбцы для очистки. После замены данные затем сохраняются в другом файле CSV.

У меня есть решение для таблиц, которые помещаются в память, но не поддерживают многопроцессорность, поэтому он использует только одно ядро.

Я хотел бы перенести это на многопроцессорность, чтобы получить эти преимущества. Наиболее важные разделы моего кода:

def SendToFile(write_df):
    if i == 0:
        write_df.to_csv(writename, mode='w', index=None)            
    else:
        write_df.to_csv(writename, mode='a', index=None)
    return 1

def CleanTheChunk(clean_df):
    df=clean_df.copy()
    for elem in clean_col_index:
            col_name=raw_cols[elem]
            df[col_name].replace(scrub_comp, regex=True, inplace = True)
    return df

###
#read in data, pre-compile regex terms, select columns to scrub of terms etc.
###

if large_data==0:
    #read in the data
    df = pd.read_csv(filename, dtype='str')

    #clean the file in every column indicated:
    for elem in clean_col_index:
        col_name=raw_cols[elem]
        df[col_name].replace(scrub_comp, regex=True, inplace = True)
    #Save the cleaned version to file
    df.to_csv(writename, index=None)

else: #This is how it handles if it was large data
    i=0 #i is used to identify when the first chunk was written 'w' or 'a'   
    #read the file in chunks
    for chunk in pd.read_csv(filename, chunksize=csize, dtype='str'):

        #Clean the file:
        chunk = CleanTheChunk(chunk)
        #Save the file
        i=SendToFile(chunk)
print("Jobs done.")

Строки не влияют друг на друга, но их необходимо сохранить в новый CSV в правильном порядке. Я просто не могу сосредоточиться на том, как читать несколько кусков, обрабатывать их параллельно, а затем записывать в новый файл CSV в правильном порядке.

ОБНОВЛЕНИЕ Я попробовал новый метод. Я свел все логи c в одну функцию, после чего эта функция вызывается для сопоставления. Я постараюсь сделать функцию короче, чтобы добраться до ошибки, с которой я сейчас сталкиваюсь.

def MP_Cleaner(chunk):
    #read in the banned terms
    #Add escape characters to any control characters in the baned terms
    #Create the regex pattern
    #Iterate over the columns that need scrubbing
    #use chunk['col_name'].replace(regexterm, regex=true, inplace=true)
    return chunk

def parallelize(data, func):
    data_split = np.array_split(data, cores)
    pool = Pool(cores)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data


df = pd.read_csv(filename, dtype='str')
if __name__ == '__main__':
    df_done=parallelize(df, MP_Cleaner)
    df_done.to_csv(writename, index=None)

    #That is it, all processing done nd file hould be saved
    print("Job Complete, "+ writename + " saved.")
    stop_time = time.strftime("%m/%d/%Y, %H:%M:%S", time.localtime() )
    print("Start time: " + start_time)
    print(" Stop time: " + stop_time)
    proceed=input("Press Enter to exit:")
    print(proceed)

Я получаю ошибку атрибута: у объекта 'list' нет атрибута 'replace'

1 Ответ

0 голосов
/ 04 февраля 2020

Разобрался. Я также использовал некоторый код из множества мест. Ускорение миллионов замен регулярных выражений в Python 3

и

http://blog.adeel.io/2016/11/06/parallelize-pandas-map-or-apply/

Окончательная запись в случае У кого-то есть схожие проблемы:

Работает только для файлов, которые умещаются в ОЗУ, но все же должен быть достаточно хорош для файлов, слишком больших для оперативной памяти, не отказываясь ни от каких преимуществ.

import multiprocessing as mp
import pandas as pd
import numpy as np
import time
import re
from multiprocessing import Pool
from trie import Trie

#Enter the filename of the csv to be scrubbed
#After processing the output will be have the prefix "CLEANED_" added to the 
#    filename provided
filename = "longest-2019-Oct.csv"

#How many cores to use, make sure you save one for overhead. The entire file
#    must fit in RAM for this method to work
cores = 9

#This is the file name for the scrubterms, that file must be in the same 
#  directory as this script. It must be a single column whose name is "items"
scrubfile="terms.csv"

#Enter the desired term to cover redactions, default is XXX
redact = "XXX"

#Columns to clean, they must be typed correctly, in "", seperated by commas
# to remove the columns earth, wind, and fire it would be
#  ["earth", "wind", "fire"]
cols=["massive"]

#***************DO NOT CHANGE ANYTHING BELOW THIS LINE*************************
writename="CLEANED_"+filename

def trie_regex_from_words(words):
    trie = Trie()
    for word in words:
        trie.add(word)
    return re.compile(r"\b" + trie.pattern() + r"\b", re.IGNORECASE)

#read in the terms to be cleaned

def MP_Cleaner(chunk):
    #read in the terms
    scrub_df= pd.read_csv(scrubfile, dtype='str')
    #Pull just the items
    my_scrub=scrub_df['items'].tolist()

    #The chars we must protect
    SpecialCharacters = [chr(92), chr(46), chr(94), chr(36), chr(42), chr(43), 
                         chr(63), chr(123), chr(125), chr(91), chr(93), 
                         chr(124), chr(40), chr(41), chr(34), chr(39)]

    #walk through terms and replace special characters with the escpae 
    #    character so they can be processed in regex properly
    for i in range(len(SpecialCharacters)):
        replacement = chr(92) + SpecialCharacters[i]
        my_scrub = [term.replace(SpecialCharacters[i], replacement ) for term in my_scrub]

    Trie_Scrub = trie_regex_from_words(my_scrub)

    for elem in cols:
        chunk[elem].replace(Trie_Scrub, value=redact, regex=True, inplace = True)

    return chunk

def parallelize(data, func):
    data_split = np.array_split(data, cores)
    pool = Pool(cores)
    data = pd.concat(pool.map(func, data_split))
    pool.close()
    pool.join()
    return data

start_time=time.strftime("%m/%d/%Y, %H:%M:%S", time.localtime() )

df = pd.read_csv(filename, dtype='str')
if __name__ == '__main__':
    df_done = parallelize(df, MP_Cleaner)
    df_done.to_csv(writename, index=None)

    #That is it, all processing done nd file hould be saved
    print("Job Complete, "+ writename + " saved.")
    stop_time = time.strftime("%m/%d/%Y, %H:%M:%S", time.localtime() )
    print("Start time: " + start_time)
    print(" Stop time: " + stop_time)
    proceed=input("Press Enter then close the window to exit:")
    print(proceed)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...