Описание: у меня уже есть последовательный и многопоточный код Python.В общем, для каждой строки в кадре данных цикл проходит через кадр данных, чтобы объединить 2 ячейки в одну строку, обработать объединенную строку и вернуть результат.
Проблема: Весь процесс очень медленный (я думаю, что этоможет занять 4-10 дней) и я хочу разделить информационный кадр на 4 или более частей, чтобы потоки обрабатывали каждую часть.Я пытался создавать потоки, но не увеличил производительность после теста с меньшим кадром данных.Общее время выполнения простого / последовательного сценария составляло 30 секунд, а потоки заканчивались за 35 секунд, что означало, что потоки были медленнее и не быстрее.
Вопрос: Как я могу ускорить процесс?
То, что я пыталсяПоследовательный подход - успешный запуск, но медленное выполнение: в простом цикле результаты сохраняются в отдельном фрейме данных и сохраняются как .csv.
Потоковый подход - успешный запуск, но даже более медленное выполнение: я попытался разбить фрейм данныхпо частям и создавать темы.Каждый поток работает на той части, которая ему дана.Затем результаты берутся и объединяются в конечный фрейм данных и сохраняются в формате .csv.
Подход с использованием очереди (потока) - не выполнен: я также пытался кодировать с помощью queue (), но я был далек от успешного выполнения.Мне, очевидно, нужно больше изучать эту тему.Будет ли это быстрее и стоит попробовать?
#code with threads
import pandas as pd
import numpy as np
def myfunction(myvar):
if len(myvar) > 256:
return False
return True
def mythread(df, resultdf):
for index, x in df.iterrows():
for index2, y in data.iterrows():
if ( x['mycol'][:1] != y['mycol'][:1] ):
combined=x['mycol']+""+y['mycol']
if(myfunction(combined)):
resultdf = resultdf.append({'mycol': combined}, ignore_index=True)
return resultdf
#Start of main
data=pd.read_csv('data/small.csv')
combined_from_threads = pd.DataFrame(columns=['mycol'])
df1,df2,df3,df4=np.array_split(data, 4)
resultdf = pd.DataFrame(columns=['mycol'])
from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=4)
async_result1 = pool.apply_async(mythread, (df1, resultdf))
async_result2 = pool.apply_async(mythread, (df2, resultdf))
async_result3 = pool.apply_async(mythread, (df3, resultdf))
async_result4 = pool.apply_async(mythread, (df4, resultdf))
return1 = async_result1.get()
return2 = async_result2.get()
return3 = async_result3.get()
return4 = async_result4.get()
combined_from_threads = pd.concat([return1, return2, return3, return4], axis = 0, sort=True)
combined_from_threads.to_csv('result_threads.csv',index = False)
#sequential - simple
data=pd.read_csv('data/small.csv')
resultdf = pd.DataFrame(columns=['mycol'])
for index, x in data.iterrows():
print(str(index)+") "+x['mycol'])
for index2, y in data.iterrows():
if ( x['mycol'][:1] != y['mycol'][:1] ):
combined=x['mycol']+""+y['mycol']
#print(combined)
if(myfunction(combined)):
#print(combined+" found!")
resultdf = resultdf.append({'mycol': combined}, ignore_index=True)
resultdf.to_csv('result_sequential.csv',index = False)
В последовательном подходе одно из моих ядерных процессоров достигает пиковых значений почти на 100%, в то время как другие процессорные ядра простаивают.Время выполнения (приблизительное среднее): 30 с
В многопоточном подходе все использование ядра процессора достигает 25% и остается там до тех пор, пока Python не завершит работу.Насколько я понимаю, он должен показывать более высокий уровень использования, чтобы код Python быстрее заканчивался из-за созданных потоков.Время выполнения (грубое среднее): 35 секунд, в то время как оно должно быть 10 секунд или меньше