Python многопроцессорная блокировка не синхронизирует многопроцессорный подкласс процесса - PullRequest
0 голосов
/ 15 марта 2020

У меня есть около 1000 изображений, которые мне нужно было обработать индивидуально и сохранить результат на диске. Сам процесс очень загружает процессор, поэтому я решил создать подкласс multiprocessing.Process и использовать глобальный буфер для получения результата. Буфер будет иметь определенный размер и будет перезаписан на диск после превышения его размера.

class ImageBuffer:
    def __init__(self):
        self.buffer = []
        self.size = 0
        self.bulk_index = 0

    def add(self, data):
        if self.size == 2000:
            self.persist()
        self.buffer.append(data)
        self.size += 1
        print(f"Adding new image. Current size {self.size}")

    def persist(self):
        self.size = 0
        pass

Синхронизация должна происходить внутри каждого процесса:

from multiprocessing import Process
import multiprocessing


class ImageWorker(Process):
    def __init__(self, buffer: ImageBuffer, lock: multiprocessing.Lock):
        super().__init__()
        self.tasks = []
        self.buffer = buffer
        self.lock = lock

    def add_task(self, task):
        self.tasks.append(task)

    def run(self):
        assert len(self.tasks) != 0
        for _ in range(len(self.tasks)):
            task = self.tasks.pop(0)
            result = process_task(task)

            self.lock.acquire()
            self.buffer.add(result)
            self.lock.release()

И вот как я запускаю процессы:

from .image_worker import ImageWorker
from .image_buffer import ImageBuffer
import multiprocessing


class ImageWorkerPool:
    def __init__(self, num_threads=multiprocessing.cpu_count()):
        self.workers = []
        self.work_index = 0
        self.buffer = ImageBuffer()

        lock = multiprocessing.Lock()
        for _ in range(num_threads):
            self.workers.append(ImageWorker(self.buffer, lock))

    def add_task(self, _image_mask):
        self.workers[self.work_index].add_task(_image_mask)
        self.work_index += 1
        self.work_index = self.work_index % len(self.workers)
        assert self.work_index < len(self.workers)

    def start(self):
        for worker in self.workers:
            worker.start()

    def complete(self):
        for worker in self.workers:
            worker.join()
        self.buffer.persist()

Как вы видите блокировку создается с помощью модуля multiprocessing, поэтому все должно работать. Однако буфер изображения вообще не синхронизируется. Что я делаю не так?

Adding new image. Current size 1
Adding new image. Current size 2
Adding new image. Current size 3
Adding new image. Current size 4
Adding new image. Current size 5
Adding new image. Current size 1
Adding new image. Current size 6
Adding new image. Current size 7
Adding new image. Current size 2
Adding new image. Current size 3
Adding new image. Current size 4
Adding new image. Current size 8
Adding new image. Current size 5
Adding new image. Current size 6
Adding new image. Current size 1
Adding new image. Current size 9
Adding new image. Current size 7
Adding new image. Current size 10
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...