Я имею дело с большими таблицами данных, 100M + строк. На некотором количестве столбцов мне нужно выполнить регулярное выражение для замены ряда терминов. Я предварительно скомпилировал и сохранил все термины в словаре для их использования. Пользователь выбирает столбцы для очистки. После замены данные затем сохраняются в другом файле CSV.
У меня есть решение для таблиц, которые помещаются в память, но не поддерживают многопроцессорность, поэтому он использует только одно ядро.
Я хотел бы перенести это на многопроцессорность, чтобы получить эти преимущества. Наиболее важные разделы моего кода:
def SendToFile(write_df):
if i == 0:
write_df.to_csv(writename, mode='w', index=None)
else:
write_df.to_csv(writename, mode='a', index=None)
return 1
def CleanTheChunk(clean_df):
df=clean_df.copy()
for elem in clean_col_index:
col_name=raw_cols[elem]
df[col_name].replace(scrub_comp, regex=True, inplace = True)
return df
###
#read in data, pre-compile regex terms, select columns to scrub of terms etc.
###
if large_data==0:
#read in the data
df = pd.read_csv(filename, dtype='str')
#clean the file in every column indicated:
for elem in clean_col_index:
col_name=raw_cols[elem]
df[col_name].replace(scrub_comp, regex=True, inplace = True)
#Save the cleaned version to file
df.to_csv(writename, index=None)
else: #This is how it handles if it was large data
i=0 #i is used to identify when the first chunk was written 'w' or 'a'
#read the file in chunks
for chunk in pd.read_csv(filename, chunksize=csize, dtype='str'):
#Clean the file:
chunk = CleanTheChunk(chunk)
#Save the file
i=SendToFile(chunk)
print("Jobs done.")
Строки не влияют друг на друга, но их необходимо сохранить в новый CSV в правильном порядке. Я просто не могу сосредоточиться на том, как читать несколько кусков, обрабатывать их параллельно, а затем записывать в новый файл CSV в правильном порядке.
ОБНОВЛЕНИЕ Я попробовал новый метод. Я свел все логи c в одну функцию, после чего эта функция вызывается для сопоставления. Я постараюсь сделать функцию короче, чтобы добраться до ошибки, с которой я сейчас сталкиваюсь.
def MP_Cleaner(chunk):
#read in the banned terms
#Add escape characters to any control characters in the baned terms
#Create the regex pattern
#Iterate over the columns that need scrubbing
#use chunk['col_name'].replace(regexterm, regex=true, inplace=true)
return chunk
def parallelize(data, func):
data_split = np.array_split(data, cores)
pool = Pool(cores)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data
df = pd.read_csv(filename, dtype='str')
if __name__ == '__main__':
df_done=parallelize(df, MP_Cleaner)
df_done.to_csv(writename, index=None)
#That is it, all processing done nd file hould be saved
print("Job Complete, "+ writename + " saved.")
stop_time = time.strftime("%m/%d/%Y, %H:%M:%S", time.localtime() )
print("Start time: " + start_time)
print(" Stop time: " + stop_time)
proceed=input("Press Enter to exit:")
print(proceed)
Я получаю ошибку атрибута: у объекта 'list' нет атрибута 'replace'