проблема многопроцессорной блокировки Python - PullRequest
5 голосов
/ 26 ноября 2011

Я хочу добавить список диктов вместе с модулем многопроцессорной обработки Python.

Вот упрощенная версия моего кода:

#!/usr/bin/python2.7
# -*- coding: utf-8 -*-

import multiprocessing
import functools
import time

def merge(lock, d1, d2):
    time.sleep(5) # some time consuming stuffs
    with lock:
        for key in d2.keys():
            if d1.has_key(key):
                d1[key] += d2[key]
            else:
                d1[key] = d2[key]

l = [{ x % 10 : x } for x in range(10000)]
lock = multiprocessing.Lock()
d = multiprocessing.Manager().dict()

partial_merge = functools.partial(merge, d1 = d, lock = lock)

pool_size = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes = pool_size)
pool.map(partial_merge, l)
pool.close()
pool.join()

print d
  1. Я получил этоошибка при запуске этого скрипта.Как мне решить эту проблему?

    RuntimeError: Lock objects should only be shared between processes through inheritance

  2. нужна ли функция lock in merge в этом состоянии?или python позаботится об этом?

  3. Я думаю, что map должен сделать, это сопоставить что-то из одного списка в другой, а не сбрасывать все вещи в одном списке в одинобъект.Так есть ли более элегантный способ делать такие вещи?

1 Ответ

11 голосов
/ 26 ноября 2011

Следующее должно запускаться кроссплатформенно (то есть в Windows тоже) в Python 2 и 3. Он использует инициализатор пула процессов, чтобы установить dict менеджера как глобальный в каждом дочернем процессе.

FYI:

  • Использование блокировки не требуется с помощью команды dict.
  • Количество процессов в Pool по умолчанию равно количеству ЦП.
  • Если вас не интересует результат, вы можете использовать apply_async вместо map.
import multiprocessing
import time

def merge(d2):
    time.sleep(1) # some time consuming stuffs
    for key in d2.keys():
        if key in d1:
            d1[key] += d2[key]
        else:
            d1[key] = d2[key]

def init(d):
    global d1
    d1 = d

if __name__ == '__main__':

    d1 = multiprocessing.Manager().dict()
    pool = multiprocessing.Pool(initializer=init, initargs=(d1, ))

    l = [{ x % 5 : x } for x in range(10)]

    for item in l:
        pool.apply_async(merge, (item,))

    pool.close()
    pool.join()

    print(l)
    print(d1)
...