Python: как эффективно открывать и читать zip-файл из нескольких процессов - PullRequest
1 голос
/ 17 января 2020

Я пытаюсь открыть один и тот же файл .zip из нескольких процессов, используя zipfile и multiprocessing. Когда я открываю файл .zip с использованием синтаксиса with zipfile.ZipFile(self.path, 'r') as archive:, код в нескольких процессах работает без проблем, но предварительная производительность ухудшается в 100 раз (в некоторых из шаблонов использования topi c) по сравнению с сохранением открытого ZipFile и вызовите чтение из него (охраняется multiprocessing.Lock() экземпляром). Когда я использую версию «блокировки» из нескольких процессов, я получаю исключение zipfile.BadZipFile.

Я не понимаю, почему.

Вот MNW C:

import os
import time
import numpy as np
import multiprocessing
import zipfile



class ZipFileSource(object):

    def __init__(self, path, access_mode="lock"):
        self.path = path
        self.lock = multiprocessing.Lock()
        self.set_access_mode(access_mode)
        self.files = self.access( lambda archive: archive.infolist())

    def set_access_mode(self, access_mode):

        def access_lock(f):
            with self.lock:
                return f(self.archive)

        def access_file(f):
            with zipfile.ZipFile(self.path, 'r') as archive:
                return f(archive)

        if access_mode == "lock":
            print("lock")
            self.archive = zipfile.ZipFile(self.path, 'r')
            self.access = access_lock
        elif access_mode == "file":
            print("file")
            self.archive = None
            self.access = access_file



    def __del__(self):
        if self.archive is not None:
            self.archive.close()

    def __len__(self):
        return len(self.files)

    def __getitem__(self, index):
        if index >= len(self):
            raise IndexError(f'Index {index} >= len {len(self)}')
        member = self.files[index]
        data_bytes = self.access( lambda archive: archive.read(member))
        return data_bytes


dirname = os.path.join(os.path.dirname(__file__), 'test_filesource_data')

fs_small = ZipFileSource(os.path.join(dirname, 'foo_bar_small.zip'), access_mode="lock")

def read_small(i):
    return fs_small[i%len(fs_small)][42]


if __name__ == '__main__':

    n_frames = 1000
    frames = np.arange(1000, dtype='u4')

    # read_small(0)


    def f(nb_processes, access_mode):

        pool = multiprocessing.Pool(processes=nb_processes)
        fs_small.set_access_mode(access_mode)
        start = time.time()
        res = 0
        for i in pool.imap_unordered(read_small, frames):
            res += i
        delta = time.time() - start
        print(f"access_mode = {access_mode}, nb processes = {nb_processes}, res = {res}, {delta*1e3/len(frames)} ms/frame")
        return res

    f(1, "file")
    f(4, "file")
    f(4, "lock") 
    f(4, "lock") #crash

Вот мой вывод консоли:

lock
file
access_mode = file, nb processes = 1, res = 110289, 0.08039402961730957 ms/frame
file
access_mode = file, nb processes = 4, res = 110289, 0.32297492027282715 ms/frame
lock
access_mode = lock, nb processes = 4, res = 110289, 0.2950408458709717 ms/frame
lock
multiprocessing.pool.RemoteTraceback: 

Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/path/to/script/test_filesource.py", line 64, in read_small
    return fs_small[i%len(fs_small)][42]
  File "/path/to/script/test_filesource.py", line 55, in __getitem__
    data_bytes = self.access( lambda archive: archive.read(member))
  File "/path/to/script/test_filesource.py", line 27, in access_lock
    return f(self.archive)
  File "/path/to/script/test_filesource.py", line 55, in <lambda>
    data_bytes = self.access( lambda archive: archive.read(member))
  File "/usr/lib/python3.6/zipfile.py", line 1337, in read
    with self.open(name, "r", pwd) as fp:
  File "/usr/lib/python3.6/zipfile.py", line 1419, in open
    % (zinfo.orig_filename, fname))
zipfile.BadZipFile: File name in directory '00000005.pkl' and header b'00000004.pkl' differ.

Вы можете найти zip-файл, использованный для воспроизведения ошибки здесь отчет об ошибке

...