Подождите, пока pool.apply_asyn c внутри цикла - PullRequest
0 голосов
/ 04 апреля 2020

Я впервые пытаюсь реализовать многопроцессорность в своем коде python. Я застрял, так как я не могу заставить async_apply ждать завершения всех его процессов sh. Я хотел бы обработать элементы небольшими порциями и сохранить результаты, пока я go просматриваю длинный список элементов.

В качестве более простого примера:

import multiprocessing as mp

def fun(x, y):
    print("here")
    return(x+y)

buffer = []

for val in range(10):
    buffer.append(val)
    print(f{Added value: {val})
    if len(buffer) == 5:
        #It is my understanding, this is necessary on Windows
        if __name__ == "__main__":
            pool = mp.Pool()
            res = [pool.apply_async(fun, args = (x,x)) for x in buffer]
            res = [r.wait() for r in res]
            print(f'Results: {res}')
            buffer = []
            pool.close()
            pool.join()

Я бы хотел это приводит к следующему выводу:

Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Here
Here
Here
Here
Here
Results: [0, 2, 4, 6, 8]
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [10, 12, 14, 16, 18]

Но фактически производит это (по крайней мере, на моей машине):

Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [None, None, None, None, None]
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Added value: 0
Added value: 1
Added value: 2
Added value: 3
Added value: 4
Added value: 5
Added value: 6
Added value: 7
Added value: 8
Added value: 9
Here
Here
Here
Here
Here
Results: [None, None, None, None, None]

Любое предложение действительно приветствуется .

1 Ответ

1 голос
/ 04 апреля 2020

Попробуйте положить целое для l oop в условный комплект.

...
if __name__ == '__main__':

    for val in range(10):
        buffer.append(val)
        print(f'Added value: {val}')
        if len(buffer) == 5:
            pool = mp.Pool()
            res = [pool.apply_async(fun, args = (x,x)) for x in buffer]
            # wait til they are ALL done ?
            for r in res:
                r.wait()
            # get the return values
            res = [r.get() for r in res]
            print(f'Results: {res}')
            buffer = []
            pool.close()
            pool.join()

Вот ваш оригинал с дополнительным осмотром. Я до сих пор не знаю, почему, но, похоже, что строки в for l oop работают в нескольких python процессах.

import multiprocessing as mp
import pickle

def fun(x, y, pid=None):
    print(f"here pid:{pid}",file=sys.stderr)
    return (x+y,pid)

buffer = []
stuff = []

with open(r'c:\pyProjects\stuff.pkl','wb') as f:
    pickle.dump(stuff,f)

for val in range(10):
    buffer.append(val)
    pid = os.getpid()
    print(f'Added value: {val}.   pid={pid}')
    d = {'val':val,'pid':pid}
    with open(r'c:\pyProjects\stuff.pkl','rb') as f:
        try:
            stuff = pickle.load(f)
            stuff.append(d)
        except EOFError as e:
            s = '\n'.join(f'\t\t\t\t{item}' for item in stuff)
            print(f'\t\t\tEOFError {d}\n\t\t\tstuff:\n{s}\n')
    with open(r'c:\pyProjects\stuff.pkl','wb') as f:
        pickle.dump(stuff,f)
    if len(buffer) == 5:
        print(buffer)
        #It is my understanding, this is necessary on Windows
        if __name__ == "__main__":
            pool = mp.Pool()
            res = [pool.apply_async(fun, args = (x,x,pid)) for x in buffer]
            res = [r.get() for r in res]
            print(f'\t\t\tResults: {res}')
            buffer = []
            pool.close()
            pool.join()

После его завершения вы можете загрузить и просмотреть выбранный файл с

>>> import pickle
>>> from pprint import pprint
>>> with open(r'c:\pyProjects\stuff.pkl','rb') as f:
...     a = pickle.load(f)

>>> a.sort(key=lambda x: x['pid'])
>>> pprint(a)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...