Синхронизация записи в разделяемую память (список) в многопроцессорной среде Python - PullRequest
0 голосов
/ 10 декабря 2018

У меня есть следующий код:

import multiprocessing
manager = multiprocessing.Manager()

Функция, которая добавляет список, если его длина меньше 4, или создает новый с начальным значением 'y'.

def f(my_array):
    if len(my_array) < 4:
        my_array.append('x')
    else:
        my_array = ['y']
    print(my_array)

Инициализация списка и создание процессов.

if __name__ == '__main__':
    my_array = manager.list(['a', 'b', 'c'])

    p1 = Process(target=f, args=(my_array))
    p2 = Process(target=f, args=(my_array))
    p3 = Process(target=f, args=(my_array))
    p4 = Process(target=f, args=(my_array))
    p5 = Process(target=f, args=(my_array))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    p5.start()

    p1.join()
    p2.join()
    p3.join()
    p4.join()
    p5.join()

Вывод, который я получил:

['a', 'b', 'c', 'x']
['y']
['y']
['y'] 
['y']

Я не понимаю, почему список добавляется только один раз.Я ожидал, что в третьей строке вывода я буду наблюдать список ['y'], добавленный 'x', поэтому ['y', 'x'], четвертый ['y', 'x', 'x']и так далее.Кажется, что разделяемая память имеет утечку или не позволяет вносить изменения в функции из нескольких процессов.Что я могу сделать, чтобы включить целевое поведение?

1 Ответ

0 голосов
/ 10 декабря 2018

Синхронизация - это одна точка, отсутствующая в вашем примере.manager.list - это обычный list в отдельном серверном процессе, который ваши рабочие процессы могут изменять через прокси-объекты.Ваши дальнейшие процессы будут одновременно проверять len(my_array).

Нет синхронизации, которая говорит им, что они должны ждать, пока другой процесс завершит свою операцию, состоящую из этой проверки длины и выполнения зависимых от действия действийна результат этой проверки.Ваша операция обновления не является атомарной операцией, вам нужно сделать ее с помощью manager.lock вокруг вашей операции.

В вашем коде есть другая проблема, когда вы перепривязываете my_array, чтобы указать наобычный список ['y'], вместо обновления / модификации общего manager.list.Вы не модифицируете manager.list процессами, которые устанавливают my_array = ['y'], manager.list сохраняет его значение ['a', 'b', 'c', 'x'] от первой модификации до первого рабочего процесса до конца вашей программы.

from multiprocessing import Process, Manager


def f(my_array, lock):
    with lock:
        if len(my_array) < 4:
            my_array.append('x')
        else:
            my_array[:] = []  # clear list inplace by assigning
            # empty list to slice of manager.list
            my_array.append('y')
    print(my_array)


if __name__ == '__main__':

    N_WORKERS = 5

    with Manager() as manager:

        my_array = manager.list(['a', 'b', 'c'])
        lock = manager.Lock()

        pool = [
            Process(target=f, args=(my_array, lock)) for _ in range(N_WORKERS)
        ]

        for p in pool:
            p.start()
        for p in pool:
            p.join()

        # Leaving the context-manager block will shut down the manager-process.
        # We need to convert the manager-list to a normal list in the parent
        # to keep its values available for further processing in the parent.
        result = list(my_array)

    print(f'result: {result}')

Пример вывода:

['a', 'b', 'c', 'x']
['y']
['y', 'x']
['y', 'x', 'x']
['y', 'x', 'x', 'x']
result: ['y', 'x', 'x', 'x']

Process finished with exit code 0
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...