Я немного не уверен в безопасности потоков и многопроцессорности.
Насколько я могу судить, multiprocessing.Pool.map обрабатывает вызывающую функцию или объект, но оставляет элементы, переданные по ссылкам, нетронутыми.
Похоже, это может быть полезно, поскольку это экономит память, но я не нашел никакой информации о безопасности потоков в этих объектах.
В моем случае я пытаюсь прочитать numpy данные с диска, однако я хочу иметь возможность изменять источник без изменения реализации, поэтому я разбил часть чтения на его собственные классы.
У меня примерно следующая ситуация:
import numpy as np
from multiprocessing import Pool
class NpReader():
def read_row(self, row_index):
pass
class NpReaderSingleFile(NpReader):
def read_row(self, row_index):
return np.load(self._filename_from_row(row_index))
def _filename_from_row(self, row_index):
return Path(row_index).with_suffix('.npy')
class NpReaderBatch(NpReader):
def __init__(self, batch_file, mmap_mode=None):
self.batch = np.load(batch_file, mmap_mode=mmap_mode)
def read_row(self, row_index):
read_index = row_index
return self.batch[read_index]
class ProcessRow():
def __init__(self, reader):
self.reader = reader
def __call__(self, row_index):
return reader.read_row(row_index).shape
readers = [
NpReaderSingleFile(),
NpReaderBatch('batch.npy'),
NpReaderBatch('batch.npy', mmap_mode='r')
]
res = []
for reader in readers:
with Pool(12) as mp:
res.append(mp.map(ProcessRow(reader), range(100000))
Мне кажется, что здесь есть много вещей, которые могут go ошибаться, но я, к сожалению, не знаю, чтобы определить какой тест на это.
Есть ли очевидные проблемы с описанным выше подходом?
Некоторые вещи, которые пришли мне в голову:
- np.load (кажется, он хорошо работает для небольших отдельных файлов, но могу ли я проверить его, чтобы убедиться, что это безопасно?
- Безопасен ли NpReaderBatch или может ли read_index быть изменен одновременно разными процессами?