параллельный ввод / вывод с Xarray для NetCDF - PullRequest
0 голосов
/ 06 января 2019

Итак, у меня есть некоторый функциональный код для проекта проверки прогноза, над которым я работал (опубликовано ниже). Код будет проходить через список файлов NetCDF (по одному на каждую начальную дату, на которую у меня есть прогнозы) и помещать в память все, что я выбрал для распределения (независимо от того, какой текущий компьютер я использую, разрешает).

Как только данные, необходимые для проверки прогноза, находятся в памяти, я создал довольно быструю подпрограмму с использованием Numba, особенно в python (она использует инфраструктуру компилятора LLVM). Основным узким местом в этом коде является то, сколько времени занимает чтение каждого файла NetCDF по отдельности и извлечение данных из него.

Я нашел эту функцию здесь , которая, кажется, поможет мне ускорить этот процесс, и мой вопрос, есть ли что-то подобное, что я мог бы реализовать в Python, чтобы помочь мне сократить время, потраченное на чтение нескольких файлов с помощью Xarray?

Код

Примечание: я избавился от некоторого кода для краткости и просто для общего представления о том, что я делаю. Я прокомментировал, где медленная часть моего кода, где я перебираю каждый файл для извлечения данных из него.

import xarray as xr
import numpy as np
import os
import numba as nb


def compute_all(work_dir, memory_to_allocate_gb):
    array_size_bytes = 3060  # Based on 15 x 51 member array
    memory_to_allocate_bytes = memory_to_allocate_gb * 1e9

    # Compute a bunch of stuff regarding memory allocation here

    for chunk_number in range(num_chunk_iterations):

        ### >>>>>>> This is where the code is slowest <<<<<<<<<
        for file_number, file in enumerate(files):

            print("\tFile Number: ", file_number)

            tmp_dataset = xr.open_dataset(file)

            tmp_forecast_array = tmp_dataset["Qout"].data[start_chunk:end_chunk, :, :]
            tmp_initialization_array = tmp_dataset["initialization_values"].data[start_chunk:end_chunk]
            tmp_dataset.close()

            for forecast_day in range(15):
                big_forecast_data_array[forecast_day, :, file_number, :] = tmp_forecast_array[:, forecast_day, :]

            big_initialization_array[file_number, :] = tmp_initialization_array

        rivids_chunk = rivids[start_chunk:end_chunk]

        # >>>> This is the fast LLVM compiled part <<<<<
        results_array = numba_calculate_metrics(
            big_forecast_data_array, big_initialization_array, len(files), chunk_size, 15
        )


if __name__ == "__main__":

    workspace = r'/directory/with/all/files'
    MEMORY_TO_ALLOCATE = 2.0  # GB
    compute_all(workspace, MEMORY_TO_ALLOCATE)
...