У меня есть около 1000 изображений, которые мне нужно было обработать индивидуально и сохранить результат на диске. Сам процесс очень загружает процессор, поэтому я решил создать подкласс multiprocessing.Process
и использовать глобальный буфер для получения результата. Буфер будет иметь определенный размер и будет перезаписан на диск после превышения его размера.
class ImageBuffer:
def __init__(self):
self.buffer = []
self.size = 0
self.bulk_index = 0
def add(self, data):
if self.size == 2000:
self.persist()
self.buffer.append(data)
self.size += 1
print(f"Adding new image. Current size {self.size}")
def persist(self):
self.size = 0
pass
Синхронизация должна происходить внутри каждого процесса:
from multiprocessing import Process
import multiprocessing
class ImageWorker(Process):
def __init__(self, buffer: ImageBuffer, lock: multiprocessing.Lock):
super().__init__()
self.tasks = []
self.buffer = buffer
self.lock = lock
def add_task(self, task):
self.tasks.append(task)
def run(self):
assert len(self.tasks) != 0
for _ in range(len(self.tasks)):
task = self.tasks.pop(0)
result = process_task(task)
self.lock.acquire()
self.buffer.add(result)
self.lock.release()
И вот как я запускаю процессы:
from .image_worker import ImageWorker
from .image_buffer import ImageBuffer
import multiprocessing
class ImageWorkerPool:
def __init__(self, num_threads=multiprocessing.cpu_count()):
self.workers = []
self.work_index = 0
self.buffer = ImageBuffer()
lock = multiprocessing.Lock()
for _ in range(num_threads):
self.workers.append(ImageWorker(self.buffer, lock))
def add_task(self, _image_mask):
self.workers[self.work_index].add_task(_image_mask)
self.work_index += 1
self.work_index = self.work_index % len(self.workers)
assert self.work_index < len(self.workers)
def start(self):
for worker in self.workers:
worker.start()
def complete(self):
for worker in self.workers:
worker.join()
self.buffer.persist()
Как вы видите блокировку создается с помощью модуля multiprocessing
, поэтому все должно работать. Однако буфер изображения вообще не синхронизируется. Что я делаю не так?
Adding new image. Current size 1
Adding new image. Current size 2
Adding new image. Current size 3
Adding new image. Current size 4
Adding new image. Current size 5
Adding new image. Current size 1
Adding new image. Current size 6
Adding new image. Current size 7
Adding new image. Current size 2
Adding new image. Current size 3
Adding new image. Current size 4
Adding new image. Current size 8
Adding new image. Current size 5
Adding new image. Current size 6
Adding new image. Current size 1
Adding new image. Current size 9
Adding new image. Current size 7
Adding new image. Current size 10