Есть ли способ накапливать результаты вызовов pool.apply_async всякий раз, когда они доступны, не собирая их в структуре, подобной списку? - PullRequest
0 голосов
/ 17 октября 2019

В циклах for я отправляю задания с помощью вызовов python pool.apply_async ().

Приведенный ниже код на python изначально был написан мной, а затем отредактирован @Santiago Magariños

import multiprocessing
import numpy as np
from time import time, sleep
from random import random

chrNames=['chr1','chr2','chr3']
sims=[1,2,3]



def accumulate_chrBased_simBased_result(chrBased_simBased_result,accumulatedSignalArray,accumulatedCountArray):    
    signalArray = chrBased_simBased_result[0]
    countArray = chrBased_simBased_result[1]

    accumulatedSignalArray += signalArray
    accumulatedCountArray += countArray


def func(chrName,simNum):

    result=[]
    sleep(random()*5)
    signal_array=np.full((10000,), simNum, dtype=float)
    count_array = np.full((10000,), simNum, dtype=int)
    result.append(signal_array)
    result.append(count_array)
    print('%s %d' %(chrName,simNum))

    return result


if __name__ == '__main__':

    accumulatedSignalArray = np.zeros((10000,), dtype=float)
    accumulatedCountArray = np.zeros((10000,), dtype=int)

    numofProcesses = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(numofProcesses)

    results = []
    for chrName in chrNames:
        for simNum in sims:
            results.append(pool.apply_async(func, (chrName,simNum,)))

    for i in results:
        print(i)

    while results:
        for r in results[:]:
            if r.ready():
                print('{} is ready'.format(r))
                accumulate_chrBased_simBased_result(r.get(),accumulatedSignalArray,accumulatedCountArray)
                results.remove(r)

    pool.close()
    pool.join()

    print(accumulatedSignalArray)
    print(accumulatedCountArray)

Есть ли способ накапливать результат вызова pool.apply_async () всякий раз, когда он доступен, не собирая их в списке, подобном структуре?

1 Ответ

2 голосов
/ 17 октября 2019

Нечто подобное может работать. Скопировав код и добавив обратный вызов, обратите внимание, что это работает, потому что пул объединяется до того, как мы получим доступ к значениям аккумулятора. Если нет, то нам нужен механизм синхронизации другого типа

class Accumulator:
    def __init__(self):
        self.signal = np.zeros((10000,), dtype=float)
        self.count = np.zeros((10000,), dtype=int)

    def on_result(self, result):
        self.signal += result[0]
        self.count += result[1]

if __name__ == '__main__':

    num_proc = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(num_proc)

    accumulator = Accumulator()
    for chrName in chrNames:
        for simNum in sims:
            pool.apply_async(func, (chrName,simNum,), callback=accumulator.on_result)

    pool.close()
    pool.join()

    print(accumulator.signal)
    print(accumulator.count)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...