Совместное использование блокировки между процессами - PullRequest
6 голосов
/ 09 мая 2019

Я пытался следовать этому решению , а также этому решению , но пока что безуспешно:

Когда я запускаю следующий блок кода:

global manager
global lock
manager = Manager()
lock = manager.Lock()

class MyClass(object):

    def get_next_chunk(self, numberlist, chunks):
        for i in range(0, len(numberlist), chunks):
            yield numberlist[i:i + chunks]

    def multi_process(self, numberlist):
        procs = 5
        chunksize = 100
        with Pool(procs) as pool:
            pool.map(self.process_numberlist,
                  self.get_next_chunk(numberlist, chunksize))
        return self.running_total_list

    def process_numberlist(self, numberlist):
        temp_num_list = []
        temp_num_list = self.getnewNumbers()
        logger.debug("temp_num_list length: " + str(len(temp_num_list)))
        try:
            lock.acquire()
        except Exception as e:
            logger.error("Couldn't acquire lock")
            logger.error(e)
            traceback.format_exc()
            logger.error(sys.exc_info()[0])
        self.running_total_list = self.running_total_list + temp
        logger.debug("New running_total_list length: "
                    + str(len(self.running_total_list)))
        lock.release()
        break

Вывод в моих журналах выглядит следующим образом:

[process_numberlist() ] temp_num_list length: 5
[process_numberlist() ] New running_total_list result set length: 5
[process_numberlist() ] temp_num_list length: 6
[process_numberlist() ] New running_total_list result set length: 6
[process_numberlist() ] temp_num_list length: 4
[process_numberlist() ] New running_total_list result set length: 9

Когда мой ожидаемый вывод, я считаю, должен выглядеть следующим образом:

[process_numberlist() ] temp_num_list length: 5
[process_numberlist() ] New running_total_list result set length: 5
[process_numberlist() ] temp_num_list length: 6
[process_numberlist() ] New running_total_list result set length: 11
[process_numberlist() ] temp_num_list length: 4
[process_numberlist() ] New running_total_list result set length: 15

Правка - Попытка 2

См. Обновление на основе предложения Аарона.Теперь получая ошибку «могу присоединиться только к итерации»

global manager
global lock

class MyClass(object):

    def get_next_chunk(self, numberlist, chunks):
        for i in range(0, len(numberlist), chunks):
            yield numberlist[i:i + chunks]

    def multi_process(self, numberlist):
        procs = 5
        chunksize = 100
        manager = Manager()
        lock = manager.Lock()
        with Pool(procs) as pool:
            func = partial(self.process_numberlist, lock)
            pool.map(function,
              self.get_next_chunk(numberlist, chunksize))
        return self.running_total_list

    def process_numberlist(self, numberlist, lock):
        temp_num_list = []
        temp_num_list = self.getnewNumbers()
        logger.debug("temp_num_list length: " + str(len(temp_num_list)))
        try:
             lock.acquire()
             self.running_total_list = self.running_total_list + temp_num_list
             logger.debug("New running_total_list length: "
                + str(len(self.running_total_list)))
             lock.release()
        except Exception as e:
            logger.error("Couldn't acquire lock")
            logger.error(e)
            traceback.format_exc()
            logger.error(sys.exc_info()[0])
        break

EDIT # 3 - getNewNumbers (), которая не включена в этот игрушечный пример, просто возвращает массив целых чисел.Надеюсь, это поможет

Ответы [ 2 ]

0 голосов
/ 19 мая 2019

Вы, кажется, путаете концепцию OOPS и IPC вместе.

Смотрите здесь, я создаю экземпляр класса A как a в материнском процессе. И я называю метод a.go от того же материнского процесса. Поскольку метод a.go вызывает multiprocessing.Pool(2), создаются два дочерних процесса. Теперь у нас есть три процесса. Одна мама и двое детей.

У каждого своя версия a. Одна мама и двое детей сейчас, но три версии экземпляра a. Я только что создал один экземпляр A как a в матери. Кто создал два других? Это ОС и Pickling в действии. Дети получают все объекты своей матери, когда она создается ОС. Если дочерний элемент изменяет свою версию a, другие версии a не затрагиваются.

import multiprocessing
import os

class A:
    def __init__(self):
        self.numbers = []

    def add(self, n):
        self.numbers.append(n)
        me = multiprocessing.current_process()
        print('mom: {}, my-pid: {}, data: {}'.format(os.getppid(), me.ident,
                                                     self.numbers))

    def go(self):
        with multiprocessing.Pool(2) as workers:
            workers.map(self.add, range(1, 4))


if __name__ == '__main__':
    a = A()
    a.go()
    print('pid: {}, data: {}'.format(multiprocessing.current_process().ident,
                                     a.numbers))

Выходной;

mom: 10029, my-pid: 10030, data: [1]
mom: 10029, my-pid: 10031, data: [2]
mom: 10029, my-pid: 10030, data: [3]
pid: 10029, data: []

Здесь только двое детей: pid 10030 и pid 10031. Они добавили 3 элемента к a.numbers. Таким образом, один из них должен был добавить два элемента, но при печати он показывает только добавленный элемент. Малыш с пидом 10030 должен показать [1, 3]. Что здесь происходит?
Давайте инициализируем список a.numbers для [0] в матери и print a.numbers перед добавлением в детей.

import multiprocessing
import os

class A:
    def __init__(self):
        self.numbers = []

    def add(self, n):
        me = multiprocessing.current_process()
        print('mom: {}, my-pid: {}, previous-data: {}'.format(
            os.getppid(), me.ident, self.numbers))
        self.numbers.append(n)
        print('mom: {}, my-pid: {}, current-data: {}'.format(
            os.getppid(), me.ident, self.numbers))

    def go(self):
        with multiprocessing.Pool(2) as workers:
            workers.map(self.add, range(1, 4))


if __name__ == '__main__':
    a = A()
    a.numbers.append(0)
    a.go()
    print('pid: {}, data: {}'.format(multiprocessing.current_process().ident,
                                     a.numbers))

Выходной;

mom: 10407, my-pid: 10408, previous-data: [0]
mom: 10407, my-pid: 10408, current-data: [0, 1]
mom: 10407, my-pid: 10409, previous-data: [0]
mom: 10407, my-pid: 10409, current-data: [0, 2]
mom: 10407, my-pid: 10408, previous-data: [0]
mom: 10407, my-pid: 10408, current-data: [0, 3]
pid: 10407, data: [0]

Все, что мать имела в a.numbers, обнаружилось у детей. Но ребенок с пидом 10408, добавивший 2 элемента, не сохранил ранее добавленный элемент.
Теперь давайте проверим экземпляр a, данные которого мы изменяем, - это один и тот же экземпляр, или каждый a отличается, даже если pid одинаковый.

import multiprocessing
import os

class A:
    def __init__(self):
        self.numbers = []

    def __str__(self):
        return '<{}>'.format(', '.join(str(x) for x in self.numbers))

    def __del__(self):
        me = multiprocessing.current_process()
        print("I'm being destroyed, my pid: {}, data: {}".format(me.ident, self))

    def add(self, n):
        me = multiprocessing.current_process()
        self.numbers.append(n)
        print('mom: {}, my-pid: {}, current-data: {}'.format(
            os.getppid(), me.ident, self.numbers))

    def go(self):
        with multiprocessing.Pool(2) as workers:
            workers.map(self.add, range(1, 4))


if __name__ == '__main__':
    a = A()
    a.numbers.append(0)
    a.go()
    print('pid: {}, data: {}'.format(multiprocessing.current_process().ident,
                                     a.numbers))

Выходной;

mom: 11881, my-pid: 11883, current-data: [0, 2]
mom: 11881, my-pid: 11882, current-data: [0, 1]
I'm being destroyed, my pid: 11882, data: <0, 1>
I'm being destroyed, my pid: 11883, data: <0, 2>
mom: 11881, my-pid: 11883, current-data: [0, 3]
I'm being destroyed, my pid: 11883, data: <0, 3>
pid: 11881, data: [0]
I'm being destroyed, my pid: 11881, data: <0>

Из вышеприведенного вывода очевидно, что дочерний процесс не завершен, потому что мы могли видеть, что pid такой же, но объект a уничтожен. Таким образом, процесс остается тем же, но экземпляр a копируется из матери.
Как делить объекты между процессами? multiprocessing.Manager на помощь.

import multiprocessing
import os


class A:
    def __init__(self):
        manager = multiprocessing.Manager()
        self.numbers = manager.list()

    def __str__(self):
        return '<{}>'.format(self.numbers)

    def __del__(self):
        me = multiprocessing.current_process()
        print("I'm being destroyed, my pid: {}, data: {}".format(
            me.ident, self))

    def add(self, n):
        me = multiprocessing.current_process()
        self.numbers.append(n)
        print('mom: {}, my-pid: {}, current-data: {}'.format(
            os.getppid(), me.ident, self.numbers))

    def go(self):
        with multiprocessing.Pool(2) as workers:
            workers.map(self.add, range(1, 4))


if __name__ == '__main__':
    a = A()
    a.numbers.append(0)
    a.go()
    print('pid: {}, data: {}'.format(multiprocessing.current_process().ident,
                                     a.numbers))

Выходной;

mom: 12296, my-pid: 12303, current-data: [0, 1]
I'm being destroyed, my pid: 12303, data: <[0, 1, 2]>
mom: 12296, my-pid: 12304, current-data: [0, 1, 2]
I'm being destroyed, my pid: 12304, data: <[0, 1, 2]>
mom: 12296, my-pid: 12303, current-data: [0, 1, 2, 3]
I'm being destroyed, my pid: 12303, data: <[0, 1, 2, 3]>
pid: 12296, data: [0, 1, 2, 3]
I'm being destroyed, my pid: 12296, data: <<ListProxy object, typeid 'list' at 0x7f69aa037048; '__str__()' failed>>

Данные теперь распределяются между процессами, но с некоторыми накладными расходами.

class A:
    def __init__(self):
        print('children: {}'.format(multiprocessing.active_children()))
        manager = multiprocessing.Manager()
        print('children: {}'.format(multiprocessing.active_children()))
        self.numbers = manager.list()

if __name__ == '__main__':
    a = A()

Выходной;

children: []
children: [<ForkProcess(SyncManager-1, started)>]

Существует дополнительный процесс для обмена объектами.
Как решить эту проблему без накладных расходов? Заставьте детей обрабатывать и возвращать данные, а также строить list в матери.

import multiprocessing


class A:
    def __init__(self):
        self.numbers = []

    def add(self, n):
        return [n]

    def go(self):
        with multiprocessing.Pool(2) as workers:
            for lst in workers.map(self.add, range(1, 4)):
                self.numbers.extend(lst)
            print('active children: {}'.format(
                [p.ident for p in multiprocessing.active_children()]))


if __name__ == '__main__':
    a = A()
    a.numbers.append(0)
    a.go()
    print('pid: {}, data: {}'.format(multiprocessing.current_process().ident,
                                     a.numbers))

Выходной;

active children: [13436, 13435]
pid: 13434, data: [0, 1, 2, 3]
0 голосов
/ 18 мая 2019

Мне кажется, что вашей главной целью здесь является доступ к общему ресурсу (running_total_list), именно поэтому я сосредотачиваюсь именно на этом.

В вашем примере вы использовали Poolтогда как я использовал Process.Вы можете взглянуть на эту статью об основных различиях между ними и решить, что больше подходит для вашего случая использования.

Я пришел к этому быстрому примеру о том, как распределить ресурсы междунесколько процессов.Это должно дать вам хорошее представление о том, как действовать дальше:

from multiprocessing import Process, Lock, Manager

def gen_numbers():
    import random
    return [i for i in range(random.randint(4,11))]

def process_numberlist(lock, shared_list, num):
    temp_num_list = gen_numbers()
    print("Proc %s: temp_num_list length: %s" %(num, len(temp_num_list)))

    try:
        lock.acquire()
        shared_list += temp_num_list
        print("Proc %s: New shared_list length: %s" %(num, len(shared_list)))
    finally:
        lock.release()

lock = Lock()
manager = Manager()
shared_list = manager.list()

proc = 5
proc_list = []

for num in range(proc):
    p = Process(target=process_numberlist, args=( lock, shared_list, num+1, ))
    p.start()

    proc_list.append( p )

for p in proc_list:
    p.join()

Одна важная вещь, на которую следует обратить внимание - это определение shared_list здесь.В отличие от потоков, каждый процесс имеет свое собственное пространство памяти (Pool не будет исключением), поэтому совместное использование данных между ними не работает.Это означает, что вам нужно реализовать какую-то межпроцессную коммуникацию (IPC), и, к счастью, Python уже предоставляет вам некоторые инструменты для этого.Одним из которых является multiprocessing.Manager.Он предоставляет некоторые структуры данных (например, dict или list), которые вы можете использовать для обмена между своими процессами.

То же самое относится и к Lock в этом случае.Это важно, так как вы не хотите получать доступ к общей памяти из нескольких процессов одновременно.Это просто сделает вашу программу непредсказуемой.

Еще одна вещь, на которую следует обратить внимание: выполнение process_numberlist происходит не обязательно в порядке , так как каждый процесс выполняется независимо друг от друга, но ониу всех есть доступ к одному и тому же ресурсу.

Надеюсь, это поможет!

...