Синхронизация - это одна точка, отсутствующая в вашем примере.manager.list
- это обычный list
в отдельном серверном процессе, который ваши рабочие процессы могут изменять через прокси-объекты.Ваши дальнейшие процессы будут одновременно проверять len(my_array)
.
Нет синхронизации, которая говорит им, что они должны ждать, пока другой процесс завершит свою операцию, состоящую из этой проверки длины и выполнения зависимых от действия действийна результат этой проверки.Ваша операция обновления не является атомарной операцией, вам нужно сделать ее с помощью manager.lock вокруг вашей операции.
В вашем коде есть другая проблема, когда вы перепривязываете my_array
, чтобы указать наобычный список ['y']
, вместо обновления / модификации общего manager.list
.Вы не модифицируете manager.list
процессами, которые устанавливают my_array = ['y']
, manager.list
сохраняет его значение ['a', 'b', 'c', 'x']
от первой модификации до первого рабочего процесса до конца вашей программы.
from multiprocessing import Process, Manager
def f(my_array, lock):
with lock:
if len(my_array) < 4:
my_array.append('x')
else:
my_array[:] = [] # clear list inplace by assigning
# empty list to slice of manager.list
my_array.append('y')
print(my_array)
if __name__ == '__main__':
N_WORKERS = 5
with Manager() as manager:
my_array = manager.list(['a', 'b', 'c'])
lock = manager.Lock()
pool = [
Process(target=f, args=(my_array, lock)) for _ in range(N_WORKERS)
]
for p in pool:
p.start()
for p in pool:
p.join()
# Leaving the context-manager block will shut down the manager-process.
# We need to convert the manager-list to a normal list in the parent
# to keep its values available for further processing in the parent.
result = list(my_array)
print(f'result: {result}')
Пример вывода:
['a', 'b', 'c', 'x']
['y']
['y', 'x']
['y', 'x', 'x']
['y', 'x', 'x', 'x']
result: ['y', 'x', 'x', 'x']
Process finished with exit code 0