многопроцессорная обработка с массивом Xarray и Numpy - PullRequest
0 голосов
/ 08 января 2019

Итак, я пытаюсь реализовать решение, которое уже было описано здесь , но я немного его изменяю. Вместо того, чтобы просто пытаться изменить массив с помощью операций, я пытаюсь читать из файла NetCDF с помощью xarray, а затем записывать в общий массив с помощью многопроцессорного модуля.

Мне кажется, что я довольно близко подхожу, но что-то идет не так. Я вставил воспроизводимый, простой пример копирования / вставки ниже. Как вы можете видеть, когда я запускаю процессы, все они могут читать созданные мной файлы, но они не корректно обновляют общий массив numpy, в который я пытаюсь записать. Любая помощь будет оценена.

Код

import ctypes
import logging
import multiprocessing as mp
import xarray as xr

from contextlib import closing

import numpy as np

info = mp.get_logger().info


def main():

    data = np.arange(10)

    for i in range(4):
        ds = xr.Dataset({'x': data})
        ds.to_netcdf('test_{}.nc'.format(i))

        ds.close()


    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 4, 10
    shared_arr = mp.Array(ctypes.c_float, N * M)
    arr = tonumpyarray(shared_arr, dtype=np.float32)
    arr = arr.reshape((N, M))

    # Fill with random values
    arr[:, :] = np.zeros((N, M))
    arr_orig = arr.copy()

    files = ['test_0.nc', 'test_1.nc', 'test_2.nc', 'test_3.nc']

    parameter_tuples = [
        (files[0], 0),
        (files[1], 1),
        (files[2], 2),
        (files[3], 3)
    ]

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access different slices of the same array
        p.map_async(g, parameter_tuples)
    p.join()

    print(arr_orig)
    print(tonumpyarray(shared_arr, np.float32).reshape(N, M))


def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_  # must be inherited, not passed as an argument


def tonumpyarray(mp_arr, dtype=np.float64):
    return np.frombuffer(mp_arr.get_obj(), dtype)


def g(params):
    """no synchronization."""
    print("Current File Name: ", params[0])

    tmp_dataset = xr.open_dataset(params[0])

    print(tmp_dataset["x"].data[:])

    arr = tonumpyarray(shared_arr)
    arr[params[1], :] = tmp_dataset["x"].data[:]

    tmp_dataset.close()


if __name__ == '__main__':
    mp.freeze_support()
    main()

1 Ответ

0 голосов
/ 08 января 2019

Что не так?

1.Вы забыли изменить форму после tonumpyarray.
2. Вы использовали неправильный dtype в tonumpyarray.

код

import ctypes
import logging
import multiprocessing as mp
import xarray as xr

from contextlib import closing

import numpy as np

info = mp.get_logger().info


def main():

    data = np.arange(10)

    for i in range(4):
        ds = xr.Dataset({'x': data})
        ds.to_netcdf('test_{}.nc'.format(i))

        ds.close()


    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 4, 10
    shared_arr = mp.Array(ctypes.c_float, N * M)
    arr = tonumpyarray(shared_arr, dtype=np.float32)
    arr = arr.reshape((N, M))

    # Fill with random values
    arr[:, :] = np.zeros((N, M))
    arr_orig = arr.copy()

    files = ['test_0.nc', 'test_1.nc', 'test_2.nc', 'test_3.nc']

    parameter_tuples = [
        (files[0], 0),
        (files[1], 1),
        (files[2], 2),
        (files[3], 3)
    ]

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr, N, M))) as p:
        # many processes access different slices of the same array
        p.map_async(g, parameter_tuples)
    p.join()

    print(arr_orig)
    print(tonumpyarray(shared_arr, np.float32).reshape(N, M))


def init(shared_arr_, N_, M_):    # add shape
    global shared_arr
    global N, M
    shared_arr = shared_arr_  # must be inherited, not passed as an argument
    N = N_
    M = M_


def tonumpyarray(mp_arr, dtype=np.float32):  # change type
    return np.frombuffer(mp_arr.get_obj(), dtype)


def g(params):
    """no synchronization."""
    print("Current File Name: ", params[0])

    tmp_dataset = xr.open_dataset(params[0])

    print(tmp_dataset["x"].data[:])

    arr = tonumpyarray(shared_arr).reshape(N, M)   # reshape
    arr[params[1], :] = tmp_dataset["x"].data[:]

    tmp_dataset.close()


if __name__ == '__main__':
    mp.freeze_support()
    main()
...