Как я могу использовать общий / управляемый словарь с пулом процессов (Python 3.x) - PullRequest
0 голосов
/ 02 марта 2019

Я работаю над проектом, который требует от меня извлечения тонны информации из некоторых файлов.Формат и большая часть информации о проекте не имеют значения для того, что я собираюсь спросить.Я в основном не понимаю, как бы я делился этим словарем со всеми процессами в пуле процессов.

Вот мой код (изменил имена переменных и удалил большую часть кода просто для того, чтобы знать части):

import json

import multiprocessing
from multiprocessing import Pool, Lock, Manager

import glob
import os

def record(thing, map):

    with mutex:
        if(thing in map):
            map[thing] += 1
        else:
            map[thing] = 1


def getThing(file, n, map): 
    #do stuff
     thing = file.read()
     record(thing, map)


def init(l):
    global mutex
    mutex = l

def main():

    #create a manager to manage shared dictionaries
    manager = Manager()

    #get the list of filenames to be analyzed
    fileSet1=glob.glob("filesSet1/*")
    fileSet2=glob.glob("fileSet2/*")

    #create a global mutex for the processes to share
    l = Lock()   

    map = manager.dict()
    #create a process pool, give it the global mutex, and max cpu count-1 (manager is its own process)
    with Pool(processes=multiprocessing.cpu_count()-1, initializer=init, initargs=(l,)) as pool:
        pool.map(lambda file: getThing(file, 2, map), fileSet1) #This line is what i need help with

main()

Из того, что я понимаю, эта лямда-функция должна работать.Строка, с которой мне нужна помощь, это: pool.map (лямбда-файл: getThing (file, 2, map), fileSet1).Это дает мне ошибку там.Ошибка: «AttributeError: Cant pickle локальный объект 'main ..'".

Буду признателен за любую помощь!

1 Ответ

0 голосов
/ 02 марта 2019

Чтобы выполнить задачи параллельно, multiprocessing «перехватывает» функцию задачи.В вашем случае эта «функция задачи» имеет вид lambda file: getThing(file, 2, map).

К сожалению, для вас по умолчанию лямбда-функции не могут быть перехвачены в python (см. Также этот пост стекопотоков ).Позвольте мне проиллюстрировать проблему с минимальным битом кода:

import multiprocessing

l = range(12)

def not_a_lambda(e):
    print(e)

def main():
    with multiprocessing.Pool() as pool:
        pool.map(not_a_lambda, l)        # Case (A)
        pool.map(lambda e: print(e), l)  # Case (B)

main()

В Случай A у нас есть правильная, свободная функция, которую можно активировать, таким образом, операция pool.map будет работать,В случае B у нас есть лямбда-функция, и происходит сбой.

Одним из возможных решений является использование правильной функции области видимости модуля (например, моего not_a_lambda).Другим решением является использование стороннего модуля, такого как укроп , для расширения функций травления.В последнем случае вы будете использовать, например, pathos вместо обычного multiprocessing модуля.Наконец, вы можете создать класс Worker, который будет собирать ваше общее состояние в качестве членов.Это может выглядеть примерно так:

import multiprocessing

class Worker:
    def __init__(self, mutex, map):
        self.mutex = mutex
        self.map = map

    def __call__(self, e):
        print("Hello from Worker e=%r" % (e, ))
        with self.mutex:
            k, v = e
            self.map[k] = v
        print("Goodbye from Worker e=%r" % (e, ))

def main():
    manager = multiprocessing.Manager()
    mutex = manager.Lock()
    map = manager.dict()

    # there is only ONE Worker instance which is shared across all processes
    # thus, you need to make sure you don't access / modify internal state of
    # the worker instance without locking the mutex.
    worker = Worker(mutex, map)

    with multiprocessing.Pool() as pool:
        pool.map(worker, l.items())

main()
...