xarray / dask - ограничение количества потоков / процессоров - PullRequest
0 голосов
/ 17 сентября 2018

Я довольно новичок в xarray, и в настоящее время я пытаюсь использовать его для подстановки некоторых NetCDF. Я запускаю это на общем сервере и хотел бы узнать, как лучше ограничить вычислительную мощность, используемую xarray, чтобы он хорошо сочетался с другими. Я немного прочитал документацию по dask и xarray, но мне не ясно, как установить ограничение на cpus / threads. Вот пример пространственного подмножества:

import glob
import os
import xarray as xr

from multiprocessing.pool import ThreadPool
import dask

wd = os.getcwd()

test_data = os.path.join(wd, 'test_data')
lat_bnds = (43, 50)
lon_bnds = (-67, -80)
output = 'test_data_subset'

def subset_nc(ncfile, lat_bnds, lon_bnds, output):
    if not glob.os.path.exists(output):
        glob.os.makedirs(output)
    outfile = os.path.join(output, os.path.basename(ncfile).replace('.nc', '_subset.nc'))

    with dask.config.set(scheduler='threads', pool=ThreadPool(5)):
        ds = xr.open_dataset(ncfile, decode_times=False)

        ds_sub = ds.where(
            (ds.lon >= min(lon_bnds)) & (ds.lon <= max(lon_bnds)) & (ds.lat >= min(lat_bnds)) & (ds.lat <= max(lat_bnds)),
            drop=True)
        comp = dict(zlib=True, complevel=5)
        encoding = {var: comp for var in ds.data_vars}
        ds_sub.to_netcdf(outfile, format='NETCDF4', encoding=encoding)

list_files = glob.glob(os.path.join(test_data, '*'))
print(list_files)

for i in list_files:
    subset_nc(i, lat_bnds, lon_bnds, output)

Я попробовал несколько вариантов этого, переместив конфигурацию ThreadPool, но я все еще вижу слишком большую активность в top (> 3000% активности процессора). Я не уверен, в чем проблема.

...