Python вызов многопроцессорной функции завершается с KeyError: 0 - PullRequest
0 голосов
/ 03 апреля 2020

Я собрал скрипт, который использует многопроцессорную библиотеку для выполнения своих задач. У меня есть датафрейм, строки которого не зависят друг от друга, который я хочу обрабатывать параллельно. Моя работа основана на этой статье.

Расчет основан на нескольких объектах, поэтому созданная мной функция для вызова pool.map (называемая processChunks) получает эти объекты, упакованные в список. Исходя из того, что вы можете передать только одну переменную в вызове карты. Список содержит следующие переменные:

  • df_chunk = dataframe
  • cols = Series
  • EGV_features = list
  • EGV_reward = Object (RandomForestRegressor)

Я ожидаю, что конечный результат в 'result' будет списком списков с каждым подсписком, являющимся возвращением от каждого дочернего процесса. Если у меня есть 10 подпроцессов, «результат» в настоящее время содержит 10 элементов NoneType. При обработке исключений ничего не отображается, и если я удаляю его, скрипт завершается с KeyError: 0. Может кто-нибудь помочь мне понять, что происходит? Большое спасибо.

Код:

def processChunks(poolData):
    try:
        # Unpack the poolData
        cols = poolData['cols']
        EGV_features = poolData['EGV_features']
        EGV_reward = poolData['EGV_reward']
        df_CASES = poolData['df_chunk']

        """ Do a bunch of stuff with the data which results in two new dtaframes, temp_gs_df and temp_ga_df... """

           return [temp_gs_df, temp_ga_df]
    except Exception:
        logging.exception("processChunks(%r) failed" % (poolData,))


def main():
    # df_CASES is populated with data up here. Several 10s of '000 rows

    import multiprocessing
    NCPU = multiprocessing.cpu_count() - 1 if multiprocessing.cpu_count() > 2 else 1
    pool = multiprocessing.Pool(processes=NCPU)

    # calculate the df chunk size as an integer
    chunk_size = int(df_CASES.shape[0]/NCPU)    

    # Split the df into chunk_size chunks. One chunk for each child process.
    # will work even if the length of the dataframe is not evenly divisible by num_processes
    chunks = [df_CASES.iloc[df_CASES.index[i:i + chunk_size]] for i in range(0, df_CASES.shape[0], chunk_size)]

    # Package the chunks into dict objects along with the other data necessary for processing them.
    # Pack the dicts into a list for iterating over when spawning child processes.
    poolData=[]
    for item in chunks:
        poolData.append(dict([
                    ('df_chunk', item),
                    ('cols',cols),
                    ('EGV_features',EGV_features),
                    ('EGV_reward',EGV_reward)
                    ]))

    # apply our function to each chunk in the list
    result = pool.map(processChunks, poolData)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...