Создать пул для каждого параллельного процесса - PullRequest
0 голосов
/ 29 апреля 2020

Я работаю над кодом, который итеративно соответствует модели, состоящей из ncomp компонентов. Каждый компонент соответствует с emcee. Я хотел бы разместить компоненты параллельно, и я хотел бы также распараллелить emcee. Идея состоит в том, что каждый процесс создает свой собственный пул для emcee. Каков наилучший способ сделать это? Я подумал, что мне, вероятно, следует использовать MPIPoolExecutor и spawn, но не смог найти никаких простых и понятных примеров на начальном уровне. Я запускаю код в кластере.

Это упрощенная версия:

import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
size=comm.Get_size()
rank=comm.Get_rank()

if rank == 0:
    # Init
    naivefit = NaiveFit()
    naivefit.first_run_fit()

    # SPLIT DATA into multiple processes
    ncomps = len(naivefit.prev_result['comps'])
    comps = np.array_split(range(ncomps), size)
else:
    naivefit = None
    comps = None

while True:
    # BROADCAST CONSTANTS
    naivefit = comm.bcast(naivefit, root=0)  # updated params

    # SCATTER DATA
    comps = comm.scatter(comps, root=0)

    # Create a pool here for each process... Perhaps it would be better to
    # create it outside the `while` loop?
    pool=???

    # FIT
    all_results_rank = []
    for comp in comps:
        result = naivefit.fit_using_emcee(i=comp, pool=pool)
        all_results_rank.append(result)

    # GATHER DATA AND UPDATE NAIVEFIT
    all_results_tmp = comm.gather(all_results_rank, root=0)
    if rank == 0:
        all_results = list(itertools.chain.from_iterable(all_results_tmp))

        # 'terminate' tells the loop to terminate if the fit converged.
        terminate = naivefit.run_fit_gather_results_multiproc(all_results)

        # Within naivefit: ncomps=ncomps+1. 
        # Now update ncomps and comps in the code.
        ncomps = len(naivefit.prev_result['comps'])
        comps = np.array_split(range(ncomps), size)
    else:
        terminate=None
        naivefit=None
        comps=None

    terminate = comm.bcast(terminate, root=0)

    if terminate:
        break

1 Ответ

0 голосов
/ 30 апреля 2020

Я бы предпочел запустить программу с процессом NxP и создать N-субкоммуникатор размера P с MPI_Comm_split () (не привязка python, может быть, comm.split ()?). Затем используйте этот коммуникатор внутри fit_using_emcee (), который также должен быть реализован с использованием MPI. Я не использую порождение, потому что я не знаю, как планировщик заданий в кластерах позволяет это.

...