Я собрал скрипт, который использует многопроцессорную библиотеку для выполнения своих задач. У меня есть датафрейм, строки которого не зависят друг от друга, который я хочу обрабатывать параллельно. Моя работа основана на этой статье.
Расчет основан на нескольких объектах, поэтому созданная мной функция для вызова pool.map (называемая processChunks) получает эти объекты, упакованные в список. Исходя из того, что вы можете передать только одну переменную в вызове карты. Список содержит следующие переменные:
- df_chunk = dataframe
- cols = Series
- EGV_features = list
- EGV_reward = Object (RandomForestRegressor)
Я ожидаю, что конечный результат в 'result' будет списком списков с каждым подсписком, являющимся возвращением от каждого дочернего процесса. Если у меня есть 10 подпроцессов, «результат» в настоящее время содержит 10 элементов NoneType. При обработке исключений ничего не отображается, и если я удаляю его, скрипт завершается с KeyError: 0. Может кто-нибудь помочь мне понять, что происходит? Большое спасибо.
Код:
def processChunks(poolData):
try:
# Unpack the poolData
cols = poolData['cols']
EGV_features = poolData['EGV_features']
EGV_reward = poolData['EGV_reward']
df_CASES = poolData['df_chunk']
""" Do a bunch of stuff with the data which results in two new dtaframes, temp_gs_df and temp_ga_df... """
return [temp_gs_df, temp_ga_df]
except Exception:
logging.exception("processChunks(%r) failed" % (poolData,))
def main():
# df_CASES is populated with data up here. Several 10s of '000 rows
import multiprocessing
NCPU = multiprocessing.cpu_count() - 1 if multiprocessing.cpu_count() > 2 else 1
pool = multiprocessing.Pool(processes=NCPU)
# calculate the df chunk size as an integer
chunk_size = int(df_CASES.shape[0]/NCPU)
# Split the df into chunk_size chunks. One chunk for each child process.
# will work even if the length of the dataframe is not evenly divisible by num_processes
chunks = [df_CASES.iloc[df_CASES.index[i:i + chunk_size]] for i in range(0, df_CASES.shape[0], chunk_size)]
# Package the chunks into dict objects along with the other data necessary for processing them.
# Pack the dicts into a list for iterating over when spawning child processes.
poolData=[]
for item in chunks:
poolData.append(dict([
('df_chunk', item),
('cols',cols),
('EGV_features',EGV_features),
('EGV_reward',EGV_reward)
]))
# apply our function to each chunk in the list
result = pool.map(processChunks, poolData)