multiprocessing.Pool.map выдает MemoryError - PullRequest
1 голос
/ 14 апреля 2020

Я переписываю Reinforcement Learning Framework от последовательного выполнения кода к параллельному (многопроцессорному), чтобы сократить время обучения. Это работает, но после нескольких часов обучения выдается MemoryError. Я попытался добавить gc.collect после каждого l oop без изменений.

Вот для l oop, который использует многопроцессорность:

for episode in episodes:
    env.episode = episode
    flex_list = [0,1,2]                                                                                          
    for machine in env.list_of_machines:                                                                            
        flex_plan = []                                                                                              
        for time_step in range(0,env.steplength):
            flex_plan.append(random.choice(flex_list))
        machine.flex_plan = flex_plan
    env.current_step = 0                                                                                            
    steps = []
    state = env.reset(restricted=True)                                                                              
    steps.append(state)

    # multiprocessing part, has condition to use a specific amount of CPUs or 'all' of them
    ####################################################
    func_part = partial(parallel_pool, episode=episode, episodes=episodes, env=env, agent=agent, state=state, log_data_qvalues=log_data_qvalues, log_data=log_data, steps=steps)
    if CPUs_used == 'all':
        mp.Pool().map(func_part, range(env.steplength-1))
    else:
        mp.Pool(CPUs_used).map(func_part, range(env.steplength-1))
    ############################################################
    # model is saved periodically, not only in the end
    save_interval = 100 #set episode interval to save models
    if (episode + 1) % save_interval == 0:
        agent.save_model(f'models/model_{filename}_{episode + 1}')
        print(f'model saved at episode {episode + 1}')

    plt.close()
    gc.collect()

Вывод после 26 эпизодов обучения:

Episode: 26/100   Action: 1/11    Phase: 3/3    Measurement Count: 231/234   THD fake slack: 0.09487   Psoll: [0.02894068 0.00046048 0.         0.        ]    Laptime: 0.181
Episode: 26/100   Action: 1/11    Phase: 3/3    Measurement Count: 232/234   THD fake slack: 0.09488   Psoll: [0.02894068 0.00046048 0.         0.        ]    Laptime: 0.181
Episode: 26/100   Action: 1/11    Phase: 3/3    Measurement Count: 233/234   THD fake slack: 0.09489   Psoll: [0.02894068 0.00046048 0.         0.        ]    Laptime: 0.179
Traceback (most recent call last):
  File "C:/Users/Artur/Desktop/RL_framework/train.py", line 87, in <module>
    main()
  File "C:/Users/Artur/Desktop/RL_framework/train.py", line 77, in main
    duration = cf.training(episodes, env, agent, filename, topology=topology, multi_processing=multi_processing, CPUs_used=CPUs_used)
  File "C:\Users\Artur\Desktop\RL_framework\help_functions\custom_functions.py", line 166, in training
    save_interval = parallel_training(range(episodes), env, agent, log_data_qvalues, log_data, filename, CPUs_used)
  File "C:\Users\Artur\Desktop\RL_framework\help_functions\custom_functions.py", line 81, in parallel_training
    mp.Pool().map(func_part, range(env.steplength-1))
  File "C:\Users\Artur\Anaconda\lib\multiprocessing\pool.py", line 268, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "C:\Users\Artur\Anaconda\lib\multiprocessing\pool.py", line 657, in get
    raise self._value
  File "C:\Users\Artur\Anaconda\lib\multiprocessing\pool.py", line 431, in _handle_tasks
    put(task)
  File "C:\Users\Artur\Anaconda\lib\multiprocessing\connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "C:\Users\Artur\Anaconda\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
MemoryError

Есть ли способ это исправить?

1 Ответ

1 голос
/ 21 апреля 2020

Когда вы создаете процессы в l oop, я полагаю, что ваша память забита, потому что процессы, которые вы создаете, остаются зависшими после того, как они завершают работу sh.

Из документа

Предупреждение: объекты multiprocessing.pool имеют внутренние ресурсы, которые необходимо правильно управлять (как и любой другой ресурс), используя пул как менеджер контекста или путем вызова close () и terminate () вручную. Невыполнение этого требования может привести к зависанию процесса при завершении. Обратите внимание, что некорректно полагаться на сборщик мусора для уничтожения пула, так как CPython не гарантирует, что будет вызван финализатор пула (см. Object. del () для получения дополнительной информации). ).

Я предлагаю вам немного реорганизовать свой код:

# set the CPUs_used to a desired number or None to use all available CPUs
with mp.Pool(processes=CPUs_used) as p:
    p.map(func_part, range(env.steplength-1))

Или вы можете вручную .close() и .join(), в зависимости от того, что больше подходит вашему стилю кодирования.

...