Подпроцессы не выполняются при использовании многопроцессорности в Python - PullRequest
1 голос
/ 24 марта 2020

Сейчас я тестирую приведенный ниже код для параллельного вычисления массива, но кажется, что функция длительного времени не будет выполняться. Моя Python версия: 3.7.4, Операционная система: win 10.

from multiprocessing import Pool, Lock, Array
import os, time    

def long_time_task(i,array,lock):
    print('Run task %s (%s)...' % (i, os.getpid()))
    start = time.time()

    total_count = 0
    for k in range(5*10**7): total_count += 1
    total_count += i
    lock.acquire()
    array[i] = total_count
    lock.release()

    end = time.time()
    print('Task %s runs %0.2f seconds.' % (i, (end - start)))


def mainFunc():
    print('Parent process %s.' % os.getpid())
    p = Pool()
    array = Array('f', 20)
    lock = Lock()

    for i in range(20): p.apply_async(long_time_task, args=(i,array,lock))

    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')


if __name__ == '__main__':    
    mainFunc()

1 Ответ

1 голос
/ 25 марта 2020

Есть несколько проблем с вашим кодом:

  1. Метод apply_async возвращает результат объект, который вам нужно ждать.

  2. Нельзя передавать обычные многопроцессорные объекты Array или Lock в методы пула, поскольку они не могут быть обработаны. Вместо этого вы можете использовать manager object.

Попробуйте это:

from multiprocessing import Pool, Lock, Array, Manager
import os, time

def long_time_task(i,array,lock):
    print('Run task %s (%s)...' % (i, os.getpid()))
    start = time.time()

    total_count = 0
    for k in range(5*10**7): total_count += 1
    total_count += i
    lock.acquire()
    array[i] = total_count
    lock.release()

    end = time.time()
    print('Task %s runs %0.2f seconds.' % (i, (end - start)))


def mainFunc():
    print('Parent process %s.' % os.getpid())
    p = Pool()
    m = Manager()
    array = m.Array('f', [0] * 20)
    lock = m.Lock()

    results  = [p.apply_async(long_time_task, args=(i,array,lock)) for i in range(20)]

    [result.get() for result in results]

    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')


if __name__ == '__main__':
    mainFunc()

Вы также можете упростить это, но я не уверен если это будет работать в вашем случае:

import array
import os
import time

from multiprocessing import Pool


def long_time_task(i):
    print(f'Run task {i} ({os.getpid()})...')
    start = time.time()

    total_count = 0

    for k in range(5 * 10 ** 7):
        total_count += 1

    total_count += i
    end = time.time()

    print(f'Task {i} runs {end - start:.2f} seconds.')
    return total_count


def main():
    print('Parent process %s.' % os.getpid())

    a = array.array('d', range(20))
    r = range(20)

    with Pool() as pool:
        for idx, result in zip(r, pool.map(long_time_task, r)):
            a[idx] = result

    print(a)

    print(f'All subprocesses done.')


if __name__ == '__main__':
    main()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...