Можно ли выполнять параллельную запись в один файл h5py с помощью многопроцессорной обработки? - PullRequest
0 голосов
/ 06 февраля 2019

Я написал простую программу, которая посмотрела бы, смогу ли я наивно заполнить файл hdf5 фиктивными данными.

import sys                                                                                                                                                                                                                        
import time                                                                                                                                                                                                                       
import multiprocessing as mp                                                                                                                                                                                                      
import numpy as np                                                                                                                                                                                                                
import h5py                                                                                                                                                                                                                       


def parallel_h5write(hdf, key, data):                                                                                                                                                                                             
    dset = hdf.create_dataset(key, shape=data.shape, dtype=data.dtype)                                                                                                                                                            
    dset[:] = data                                                                                                                                                                                                                
    hdf.flush()                                                                                                                                                                                                                   
    return None                                                                                                                                                                                                                   


large_data = np.array([                                                                                                                                                                                                           
    float(j)*np.ones(20000) for j in xrange(1, 20001)                                                                                                                                                                             
])                                                                                                                                                                                                                                


start = time.time()                                                                                                                                                                                                               
h5file = h5py.File('serial.h5', 'w')                                                                                                                                                                                              
for j, data in enumerate(large_data):                                                                                                                                                                                             
    nm = "name{0}".format(j)                                                                                                                                                                                                      
    dset = h5file.create_dataset(nm, shape=data.shape, dtype=data.dtype)                                                                                                                                                          
    dset[:] = data                                                                                                                                                                                                                
    h5file.flush()                                                                                                                                                                                                                
h5file.close()                                                                                                                                                                                                                    
finish = time.time()                                                                                                                                                                                                              
print "serial write took {0} seconds".format(finish - start)                                                                                                                                                                      
start = time.time()                                                                                                                                                                                                               


h5file = h5py.File('parallel.h5', 'w')                                                                                                                                                                                            
arglist = [(h5file, 'name{0}'.format(j), data) for j, data in enumerate(large_data)]                                                                                                                                              
p = mp.Pool(4)                                                                                                                                                                                                                    
try:                                                                                                                                                                                                                              
    p.map(parallel_h5write, arglist)                                                                                                                                                                                              
except Exception, err:                                                                                                                                                                                                            
    p.close()                                                                                                                                                                                                                     
    p.join()                                                                                                                                                                                                                      
    print err                                                                                                                                                                                                                     
    sys.exit()                                                                                                                                                                                                                    
p.close()                                                                                                                                                                                                                         
p.join()                                                                                                                                                                                                                          
finish = time.time()                                                                                                                                                                                                              
print "parallel write took {0} seconds".format(finish - start)

Я получил следующее сообщение об ошибке:

Exception KeyError: KeyError(0,) in 'h5py._objects.ObjectID.__dealloc__' ignored
Process PoolWorker-1:
Traceback (most recent call last):
  File "/home/user/anaconda2/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
    self.run()
  File "/home/user/anaconda2/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/user/anaconda2/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/home/user/anaconda2/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
  File "stringsource", line 5, in h5py.h5f.__pyx_unpickle_FileID
  File "h5py/_objects.pyx", line 178, in h5py._objects.ObjectID.__cinit__
TypeError: __cinit__() takes exactly 1 positional argument (0 given)
Traceback (most recent call last):
  File "h5py/_objects.pyx", line 200, in h5py._objects.ObjectID.__dealloc__
KeyError: 0

Нет необходимости говорить, что я не уверен, что делать с этим сообщением об ошибке.Что я делаю неправильно?Что я мог сделать, чтобы это исправить?Могу ли я на самом деле это сделать?

...