Разделить Dataframe, заставить потоки обрабатывать каждую часть, затем объединить Dataframe - PullRequest
0 голосов
/ 15 июня 2019

Описание: у меня уже есть последовательный и многопоточный код Python.В общем, для каждой строки в кадре данных цикл проходит через кадр данных, чтобы объединить 2 ячейки в одну строку, обработать объединенную строку и вернуть результат.

Проблема: Весь процесс очень медленный (я думаю, что этоможет занять 4-10 дней) и я хочу разделить информационный кадр на 4 или более частей, чтобы потоки обрабатывали каждую часть.Я пытался создавать потоки, но не увеличил производительность после теста с меньшим кадром данных.Общее время выполнения простого / последовательного сценария составляло 30 секунд, а потоки заканчивались за 35 секунд, что означало, что потоки были медленнее и не быстрее.

Вопрос: Как я могу ускорить процесс?

То, что я пыталсяПоследовательный подход - успешный запуск, но медленное выполнение: в простом цикле результаты сохраняются в отдельном фрейме данных и сохраняются как .csv.

Потоковый подход - успешный запуск, но даже более медленное выполнение: я попытался разбить фрейм данныхпо частям и создавать темы.Каждый поток работает на той части, которая ему дана.Затем результаты берутся и объединяются в конечный фрейм данных и сохраняются в формате .csv.

Подход с использованием очереди (потока) - не выполнен: я также пытался кодировать с помощью queue (), но я был далек от успешного выполнения.Мне, очевидно, нужно больше изучать эту тему.Будет ли это быстрее и стоит попробовать?

#code with threads
import pandas as pd
import numpy as np

def myfunction(myvar): 
    if len(myvar) > 256: 
        return False
    return True

def mythread(df, resultdf):
    for index, x in df.iterrows():
        for index2, y in data.iterrows():
            if ( x['mycol'][:1] != y['mycol'][:1] ):
                combined=x['mycol']+""+y['mycol']
                if(myfunction(combined)):
                    resultdf = resultdf.append({'mycol': combined}, ignore_index=True)
    return resultdf


#Start of main
data=pd.read_csv('data/small.csv')
combined_from_threads = pd.DataFrame(columns=['mycol'])

df1,df2,df3,df4=np.array_split(data, 4)
resultdf = pd.DataFrame(columns=['mycol'])

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=4)

async_result1 = pool.apply_async(mythread, (df1, resultdf)) 
async_result2 = pool.apply_async(mythread, (df2, resultdf)) 
async_result3 = pool.apply_async(mythread, (df3, resultdf)) 
async_result4 = pool.apply_async(mythread, (df4, resultdf)) 
return1 = async_result1.get()
return2 = async_result2.get()
return3 = async_result3.get()
return4 = async_result4.get()

combined_from_threads = pd.concat([return1, return2, return3, return4], axis = 0, sort=True)
combined_from_threads.to_csv('result_threads.csv',index = False)
#sequential - simple
data=pd.read_csv('data/small.csv')
resultdf = pd.DataFrame(columns=['mycol'])

for index, x in data.iterrows():
    print(str(index)+") "+x['mycol'])
    for index2, y in data.iterrows():
        if ( x['mycol'][:1] != y['mycol'][:1] ):
            combined=x['mycol']+""+y['mycol']
            #print(combined)
            if(myfunction(combined)):
                #print(combined+" found!")
                resultdf = resultdf.append({'mycol': combined}, ignore_index=True)

resultdf.to_csv('result_sequential.csv',index = False)

В последовательном подходе одно из моих ядерных процессоров достигает пиковых значений почти на 100%, в то время как другие процессорные ядра простаивают.Время выполнения (приблизительное среднее): 30 с

В многопоточном подходе все использование ядра процессора достигает 25% и остается там до тех пор, пока Python не завершит работу.Насколько я понимаю, он должен показывать более высокий уровень использования, чтобы код Python быстрее заканчивался из-за созданных потоков.Время выполнения (грубое среднее): 35 секунд, в то время как оно должно быть 10 секунд или меньше

...