R - применить функцию к каждому элементу массива параллельно - PullRequest
1 голос
/ 02 июля 2019

У меня есть измерения максимальной и минимальной температуры и осадков, которые организованы в виде массивов размеров (100x96x50769), где i и j - ячейки сетки со связанными координатами, а z - количество измерений во времени.

Концептуально это выглядит так:

enter image description here

Я использую пакет climdex.pcic для расчета индексов экстремальных погодных явлений.Учитывая временные ряды максимальной и минимальной температуры и осадков, функция climdexInput.raw вернет объект climdexIput, который можно использовать для определения нескольких показателей: количества морозных дней, количества летних дней, последовательных сухих дней и т. Д.

Вызов функции довольно прост:

ci <- climdexInput.raw(tmax=x, tmin=y, prec=z,
                       t, t, t, base.range=c(1961,1990))

, где x - вектор максимальных температур, y - вектор минимальных температур, z - вектор осадков, а t - вектор.с датами, под которыми были измерены x, y и z.

Что я хотел бы сделать, это извлечь временные ряды для каждого элемента моего массива (т.е. каждой ячейки сетки на рисунке выше) и использовать его для запускафункция climdexInput.raw.

Из-за большого количества элементов реальных данных я хочу выполнить эту задачу параллельно на моем 4-ядерном сервере Linux.Однако у меня нет опыта с распараллеливанием в R.

Вот один пример моей программы (с намеренно уменьшенными размерами для ускорения выполнения на вашем компьютере):

library(climdex.pcic)

# Create some dates
t <- seq(as.Date('2000-01-01'), as.Date('2010-12-31'), 'day')

# Parse the dates into PCICt
t <- as.PCICt(strftime(t), cal='gregorian')

# Create some dummy weather data, with dimensions `# of lat`, `# of lon` and `# of timesteps`
nc.min <- array(runif(10*9*4018, min=0, max=15), c(10, 9, 4018))
nc.max <- array(runif(10*9*4018, min=25, max=40), c(10, 9, 4018))
nc.prc <- array(runif(10*9*4018, min=0, max=25), c(10, 9, 4018))

# Create "ci" object
ci <- climdexInput.raw(tmax=nc.max[1,1,], tmin=nc.min[1,1,], prec=nc.prc[1,1,],
                       t, t, t, base.range=c(2000,2005))

# Once you have “ci”, you can compute any of the indices provided by the climdex.pcic package.
# The example below is for cumulative # of dry days per year:
cdd <- climdex.cdd(ci, spells.can.span.years = TRUE) 

Теперь, пожалуйста, обратите вниманиечто в приведенном выше примере я использовал только первый элемент моего массива ([1,1,]) в качестве примера в функции climdexInput.raw.

Как можно сделать то же самое для всех элементов, используя преимущества параллельногообработка, возможно, путем зацикливания размеров i и j моего массива?

1 Ответ

1 голос
/ 02 июля 2019

Вы можете использовать foreach для этого:

library(doParallel)
registerDoParallel(cl <- makeCluster(3))
res <- foreach(j = seq_len(ncol(nc.min))) %:% 
  foreach(i = seq_len(nrow(nc.min))) %dopar% {
    ci <- climdex.pcic::climdexInput.raw(
      tmax=nc.max[i,j,], 
      tmin=nc.min[i,j,],
      prec=nc.prc[i,j,],
      t, t, t, 
      base.range=c(2000,2005)
    )
  }
stopCluster(cl)

См. Мое руководство по параллелизму с использованием foreach: https://privefl.github.io/blog/a-guide-to-parallelism-in-r/.

Затем, чтобы вычислить индекс, просто используйте climdex.cdd(res[[1]][[1]], spells.can.span.years = TRUE) (j first, i second).

...