Запись с многопроцессорностью в один файл HDF5 размером Python 3 - PullRequest
0 голосов
/ 05 мая 2020

Я бы хотел ускорить свой сценарий за счет возможности многопроцессорной обработки. У меня есть следующий рабочий сценарий:

#!/usr/bin/env python3

import os
import sys
import h5py as h5
import numpy as np

if __name__ == "__main__":

    file_cxi = sys.argv[1]
    mask_cxi = sys.argv[2]
    mask_3D_inv_file = sys.argv[3]
    num = sys.argv[4]
    file_cxi_copy = sys.argv[5]

    mask_3D = h5.File(mask_3D_inv_file, 'r')
    mask_3D_inv = np.array(mask_3D['/data/data'], dtype=np.float32)
    mask_3D.close()

    h5r = h5.File(file_cxi, 'r')
    mask = h5r[mask_cxi]

    with h5.File(file_cxi_copy, 'a') as h5w:
        d = h5w.create_dataset(path_to_new_mask, (1,16,512,128), maxshape=(num,16,512,128), dtype=np.float32,chunks=(1,16,512,128), compression="gzip", compression_opts=4)

        for i in range(num):
            d.resize((i+1,16,512,128))
            slice_arr = mask[i,:,:,:]
            slice_arr = np.array(slice_arr)
            index_bad = np.where(slice_arr > 0)

            slice_arr_upt = np.zeros_like(slice_arr)
            slice_arr_upt[index_bad] = 1

            tmp = np.bitwise_or(slice_arr_upt, mask_3D_inv)
            tmp[index_bad] = slice_arr[index_bad]
            d[i, :, :, :] = tmp

    h5r.close()

Поскольку я не знаком с многопроцессорностью, буду благодарен, если вы дадите мне отзыв и некоторые исправления к моему первому подходу к переписыванию сценария с помощью multiprocessing:

#!/usr/bin/env python3


import os
import sys
import h5py as h5
import numpy as np
from multiprocessing import Pool, Lock, Array


def processing(i):
    global result_lock
    global d
    global mask
    global mask_3D_inv


    with result_lock:  
        d.resize((i+1,16,512,128))
        slice_arr = mask[i,:,:,:]
        slice_arr = np.array(slice_arr)
        index_bad = np.where(slice_arr > 0)

        slice_arr_upt = np.zeros_like(slice_arr)
        slice_arr_upt[index_bad] = 1

        tmp = np.bitwise_or(slice_arr_upt, mask_3D_inv)
        tmp[index_bad] = slice_arr[index_bad] # add real value of bad pixels
        d[i, :, :, :] = tmp


def init_worker(res_lck, d_buf):
    global result_lock
    global d
    global mask
    global mask_3D_inv

    d = np.frombuffer(d_buf, dtype=np.float32)
    result_lock = res_lck

if __name__ == "__main__":

    file_cxi = sys.argv[1]
    mask_cxi = sys.argv[2]
    mask_3D_inv_file = sys.argv[3]
    num = sys.argv[4]
    file_cxi_copy = sys.argv[5]

    mask_3D = h5.File(mask_3D_inv_file, 'r')
    mask_3D_inv = np.array(mask_3D['/data/data'], dtype=np.float32)
    mask_3D.close()

    h5r = h5.File(file_cxi, 'r')
    mask = h5r[mask_cxi]

    with h5.File(file_cxi_copy, 'a') as h5w:
        d = h5w.create_dataset(path_to_new_mask, (1,16,512,128), maxshape=(num,16,512,128), dtype=np.float32,chunks=(1,16,512,128), compression="gzip", compression_opts=4)
        output_array = range(num)
        tmp_buf = Array(np.float32, (16,512,128))
        result_lock = Lock()
        pool = Pool(31,
                initializer=init_worker,
                initargs=(result_lock, tmp_buf))
        pool.map(processing, output_array)
        tmp = np.frombuffer(tmp_buf, dtype=np.float32)

    h5r.close()
...