Итак, я пытаюсь реализовать решение, которое уже было описано здесь , но я немного его изменяю. Вместо того, чтобы просто пытаться изменить массив с помощью операций, я пытаюсь читать из файла NetCDF с помощью xarray, а затем записывать в общий массив с помощью многопроцессорного модуля.
Мне кажется, что я довольно близко подхожу, но что-то идет не так. Я вставил воспроизводимый, простой пример копирования / вставки ниже. Как вы можете видеть, когда я запускаю процессы, все они могут читать созданные мной файлы, но они не корректно обновляют общий массив numpy, в который я пытаюсь записать. Любая помощь будет оценена.
Код
import ctypes
import logging
import multiprocessing as mp
import xarray as xr
from contextlib import closing
import numpy as np
info = mp.get_logger().info
def main():
data = np.arange(10)
for i in range(4):
ds = xr.Dataset({'x': data})
ds.to_netcdf('test_{}.nc'.format(i))
ds.close()
logger = mp.log_to_stderr()
logger.setLevel(logging.INFO)
# create shared array
N, M = 4, 10
shared_arr = mp.Array(ctypes.c_float, N * M)
arr = tonumpyarray(shared_arr, dtype=np.float32)
arr = arr.reshape((N, M))
# Fill with random values
arr[:, :] = np.zeros((N, M))
arr_orig = arr.copy()
files = ['test_0.nc', 'test_1.nc', 'test_2.nc', 'test_3.nc']
parameter_tuples = [
(files[0], 0),
(files[1], 1),
(files[2], 2),
(files[3], 3)
]
# write to arr from different processes
with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
# many processes access different slices of the same array
p.map_async(g, parameter_tuples)
p.join()
print(arr_orig)
print(tonumpyarray(shared_arr, np.float32).reshape(N, M))
def init(shared_arr_):
global shared_arr
shared_arr = shared_arr_ # must be inherited, not passed as an argument
def tonumpyarray(mp_arr, dtype=np.float64):
return np.frombuffer(mp_arr.get_obj(), dtype)
def g(params):
"""no synchronization."""
print("Current File Name: ", params[0])
tmp_dataset = xr.open_dataset(params[0])
print(tmp_dataset["x"].data[:])
arr = tonumpyarray(shared_arr)
arr[params[1], :] = tmp_dataset["x"].data[:]
tmp_dataset.close()
if __name__ == '__main__':
mp.freeze_support()
main()