Я пытаюсь открыть один и тот же файл .zip из нескольких процессов, используя zipfile
и multiprocessing
. Когда я открываю файл .zip с использованием синтаксиса with zipfile.ZipFile(self.path, 'r') as archive:
, код в нескольких процессах работает без проблем, но предварительная производительность ухудшается в 100 раз (в некоторых из шаблонов использования topi c) по сравнению с сохранением открытого ZipFile и вызовите чтение из него (охраняется multiprocessing.Lock()
экземпляром). Когда я использую версию «блокировки» из нескольких процессов, я получаю исключение zipfile.BadZipFile
.
Я не понимаю, почему.
Вот MNW C:
import os
import time
import numpy as np
import multiprocessing
import zipfile
class ZipFileSource(object):
def __init__(self, path, access_mode="lock"):
self.path = path
self.lock = multiprocessing.Lock()
self.set_access_mode(access_mode)
self.files = self.access( lambda archive: archive.infolist())
def set_access_mode(self, access_mode):
def access_lock(f):
with self.lock:
return f(self.archive)
def access_file(f):
with zipfile.ZipFile(self.path, 'r') as archive:
return f(archive)
if access_mode == "lock":
print("lock")
self.archive = zipfile.ZipFile(self.path, 'r')
self.access = access_lock
elif access_mode == "file":
print("file")
self.archive = None
self.access = access_file
def __del__(self):
if self.archive is not None:
self.archive.close()
def __len__(self):
return len(self.files)
def __getitem__(self, index):
if index >= len(self):
raise IndexError(f'Index {index} >= len {len(self)}')
member = self.files[index]
data_bytes = self.access( lambda archive: archive.read(member))
return data_bytes
dirname = os.path.join(os.path.dirname(__file__), 'test_filesource_data')
fs_small = ZipFileSource(os.path.join(dirname, 'foo_bar_small.zip'), access_mode="lock")
def read_small(i):
return fs_small[i%len(fs_small)][42]
if __name__ == '__main__':
n_frames = 1000
frames = np.arange(1000, dtype='u4')
# read_small(0)
def f(nb_processes, access_mode):
pool = multiprocessing.Pool(processes=nb_processes)
fs_small.set_access_mode(access_mode)
start = time.time()
res = 0
for i in pool.imap_unordered(read_small, frames):
res += i
delta = time.time() - start
print(f"access_mode = {access_mode}, nb processes = {nb_processes}, res = {res}, {delta*1e3/len(frames)} ms/frame")
return res
f(1, "file")
f(4, "file")
f(4, "lock")
f(4, "lock") #crash
Вот мой вывод консоли:
lock
file
access_mode = file, nb processes = 1, res = 110289, 0.08039402961730957 ms/frame
file
access_mode = file, nb processes = 4, res = 110289, 0.32297492027282715 ms/frame
lock
access_mode = lock, nb processes = 4, res = 110289, 0.2950408458709717 ms/frame
lock
multiprocessing.pool.RemoteTraceback:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/path/to/script/test_filesource.py", line 64, in read_small
return fs_small[i%len(fs_small)][42]
File "/path/to/script/test_filesource.py", line 55, in __getitem__
data_bytes = self.access( lambda archive: archive.read(member))
File "/path/to/script/test_filesource.py", line 27, in access_lock
return f(self.archive)
File "/path/to/script/test_filesource.py", line 55, in <lambda>
data_bytes = self.access( lambda archive: archive.read(member))
File "/usr/lib/python3.6/zipfile.py", line 1337, in read
with self.open(name, "r", pwd) as fp:
File "/usr/lib/python3.6/zipfile.py", line 1419, in open
% (zinfo.orig_filename, fname))
zipfile.BadZipFile: File name in directory '00000005.pkl' and header b'00000004.pkl' differ.
Вы можете найти zip-файл, использованный для воспроизведения ошибки здесь отчет об ошибке