Как я могу обработать tar-файл с помощью многопроцессорного пула Python? - PullRequest
7 голосов
/ 24 ноября 2011

Я пытаюсь обработать содержимое файла tarfile, используя multiprocessing.Pool. Я могу успешно использовать реализацию ThreadPool в многопроцессорном модуле, но хотел бы иметь возможность использовать процессы вместо потоков, поскольку это могло бы быть быстрее и исключить некоторые изменения, внесенные в Matplotlib для обработки многопоточной среды. Я получаю сообщение об ошибке, которое, как я подозреваю, связано с процессами, не разделяющими адресное пространство, но я не уверен, как это исправить:

Traceback (most recent call last):
  File "test_tarfile.py", line 32, in <module>
    test_multiproc()
  File "test_tarfile.py", line 24, in test_multiproc
    pool.map(read_file, files)
  File "/ldata/whitcomb/epd-7.1-2-rh5-x86_64/lib/python2.7/multiprocessing/pool.py", line 225, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/ldata/whitcomb/epd-7.1-2-rh5-x86_64/lib/python2.7/multiprocessing/pool.py", line 522, in get
    raise self._value
ValueError: I/O operation on closed file

Реальная программа более сложная, но это пример того, что я делаю, которая воспроизводит ошибку:

from multiprocessing.pool import ThreadPool, Pool
import StringIO
import tarfile

def write_tar():
    tar = tarfile.open('test.tar', 'w')
    contents = 'line1'
    info = tarfile.TarInfo('file1.txt')
    info.size = len(contents)
    tar.addfile(info, StringIO.StringIO(contents))
    tar.close()

def test_multithread():
    tar   = tarfile.open('test.tar')
    files = [tar.extractfile(member) for member in tar.getmembers()]
    pool  = ThreadPool(processes=1)
    pool.map(read_file, files)
    tar.close()

def test_multiproc():
    tar   = tarfile.open('test.tar')
    files = [tar.extractfile(member) for member in tar.getmembers()]
    pool  = Pool(processes=1)
    pool.map(read_file, files)
    tar.close()

def read_file(f):
    print f.read()

write_tar()
test_multithread()
test_multiproc()

Я подозреваю, что что-то не так, когда объект TarInfo передается в другой процесс, а родительский TarFile нет, но я не уверен, как это исправить в многопроцессорном случае. Могу ли я сделать это без необходимости извлекать файлы из архива и записывать их на диск?

1 Ответ

6 голосов
/ 24 ноября 2011

Вы не передаете объект TarInfo в другой процесс, вы передаете результат tar.extractfile(member) в другой процесс, где member является объектом TarInfo.Метод extractfile(...) возвращает файлоподобный объект, который, помимо прочего, имеет метод read(), который работает с исходным файлом tar, который вы открыли с помощью tar = tarfile.open('test.tar').

.открыть файл из одного процесса в другом процессе, вы должны заново открыть файл.Я заменил ваш test_multiproc() на это:

def test_multiproc():
    tar   = tarfile.open('test.tar')
    files = [name for name in tar.getnames()]
    pool  = Pool(processes=1)
    result = pool.map(read_file2, files)
    tar.close()

И добавил это:

def read_file2(name):
    t2 = tarfile.open('test.tar')
    print t2.extractfile(name).read()
    t2.close()

и смог заставить ваш код работать.

...