Выполнить цикл foreach параллельно или последовательно с заданным условием - PullRequest
12 голосов
/ 29 июля 2011

Я часто получаю несколько вложенных циклов foreach, и иногда при написании общих функций (например, для пакета) нет уровня, на котором можно было бы распараллелить. Есть ли способ выполнить то, что описывает макет ниже?

foreach(i = 1:I) %if(I < J) `do` else `dopar`% {
    foreach(j = 1:J) %if(I >= J) `do` else `dopar`% {
        # Do stuff
    }
}

Кроме того, есть ли способ определить, зарегистрирован ли параллельный бэкэнд, чтобы я мог избежать ненужных предупреждений? Это было бы полезно как при проверке пакетов перед отправкой CRAN, так и для того, чтобы не беспокоить пользователей, использующих R на одноядерных компьютерах.

foreach(i=1:I) %if(is.parallel.backend.registered()) `dopar` else `do`% {
    # Do stuff
}

Спасибо за ваше время.

Редактировать: Большое спасибо за все отзывы о ядрах и рабочих, и вы правы в том, что лучший способ справиться с приведенным выше примером - это переосмыслить всю установку. Я предпочел бы что-то вроде ниже идеи triu, но это по сути тот же момент. И, конечно, это можно сделать с помощью параллельной tapply, как предложил Джорис.

ij <- expand.grid(i=1:I, j=1:J)
foreach(i=ij$I, j=ij$J) %dopar% {
    myFuction(i, j)
}

Однако, пытаясь упростить ситуацию, из-за которой возникла эта тема, я пропустил некоторые важные детали. Представьте, что у меня есть две функции analyse и batch.analyse, и лучший уровень для распараллеливания может отличаться в зависимости от значений n.replicates и n.time.points.

analyse <- function(x, y, n.replicates=1000){
    foreach(r = 1:n.replicates) %do% {
        # Do stuff with x and y
    }
}
batch.analyse <- function(x, y, n.replicates=10, n.time.points=1000){
    foreach(tp = 1:time.points) %do% {
        my.y <- my.func(y, tp)
        analyse(x, my.y, n.replicates)
    }
}

Если n.time.points > n.replicates, имеет смысл распараллеливать в batch.analyse, но в противном случае имеет смысл распараллеливать в analyse. Любые идеи о том, как справиться с этим? Можно ли как-то обнаружить в analyse, если распараллеливание уже произошло?

Ответы [ 3 ]

8 голосов
/ 26 февраля 2013

Проблема, которую вы подняли, была мотивация для оператора вложенности foreach, «%:%». Если тело внутреннего цикла занимает значительное количество вычислительного времени, вы довольно безопасно используете:

foreach(i = 1:I) %:%
    foreach(j = 1:J) %dopar% {
        # Do stuff
    }

Это «развертывает» вложенные циклы, что приводит к (I * J) задачам, которые все могут выполняться параллельно.

Если тело внутреннего цикла не займет много времени, решение будет более трудным. Стандартное решение состоит в том, чтобы распараллелить внешний цикл, но это все равно может привести либо ко многим небольшим задачам (когда I большой, а J маленький), либо к нескольким большим задачам (когда I маленький, а J большой).

Мое любимое решение - использовать оператор вложенности с чанкингом задач. Вот полный пример использования doMPI backend:

library(doMPI)
cl <- startMPIcluster()
registerDoMPI(cl)
I <- 100; J <- 2
opt <- list(chunkSize=10)
foreach(i = 1:I, .combine='cbind', .options.mpi=opt) %:%
    foreach(j = 1:J, .combine='c') %dopar% {
        (i * j)
    }
closeCluster(cl)

В результате получается 20 «блоков задач», каждый из которых состоит из 10 вычислений тела цикла. Если вы хотите иметь один блок задач для каждого работника, вы можете вычислить размер блока как:

cs <- ceiling((I * J) / getDoParWorkers())
opt <- list(chunkSize=cs)

К сожалению, не все параллельные бэкэнды поддерживают разбиение задач на части. Кроме того, doMPI не поддерживает Windows.

Для получения дополнительной информации по этой теме см. Мою виньетку «Вложенные циклы Foreach» в пакете foreach:

library(foreach)
vignette('nesting')
6 голосов
/ 29 июля 2011

Если вы получите несколько вложенных циклов foreach, я бы переосмыслил свой подход.Использование параллельных версий tapply может решить многие из этих проблем.В общем, вы не должны использовать вложенное распараллеливание, поскольку это ничего вам не даст.Распараллелить внешний цикл и забыть о внутреннем цикле.

Причина проста: если у вас есть 3 соединения в кластере, внешний цикл dopar будет использовать все три.Внутренняя петля допара не сможет использовать какие-либо дополнительные соединения, поскольку их нет.Так что вы ничего не получите.Следовательно, макет, который вы даете, вообще не имеет смысла с точки зрения программирования.

На ваш второй вопрос довольно легко отвечает функция getDoParRegistered(), которая возвращает TRUE, когда бэкэнд зарегистрирован,и ЛОЖЬ в противном случае.Обратите внимание:

  • также возвращает TRUE, если зарегистрирован последовательный бэкэнд (т.е. после вызова registerDoSEQ).
  • Он также вернет TRUE после остановки кластера, но в этом случае% dopar% выдаст ошибку.

Например:

require(foreach)
require(doSNOW)
cl <- makeCluster(rep("localhost",2),type="SOCK")
getDoParRegistered()
[1] FALSE
registerDoSNOW(cl)
getDoParRegistered()
[1] TRUE
stopCluster(cl)
getDoParRegistered()
[1] TRUE

Но теперь запустив этот код:

a <- matrix(1:16, 4, 4)
b <- t(a)
foreach(b=iter(b, by='col'), .combine=cbind) %dopar%
  (a %*% b)

вернется с ошибкой:

Error in summary.connection(connection) : invalid connection

Вы можете создать дополнительную проверку.(Ужасно уродливый) хак, который вы можете использовать для проверки правильности зарегистрированного doSNOW соединения, может быть:

isvalid <- function(){
    if (getDoParRegistered() ){
      X <- foreach:::.foreachGlobals$objs[[1]]$data
      x <- try(capture.output(print(X)),silent=TRUE)
      if(is(x,"try-error")) FALSE else TRUE
    } else {
      FALSE
    }
}

, который вы можете использовать как

if(!isvalid()) registerDoSEQ()

Это будетзарегистрируйте последовательный бэкэнд, если getDoParRegistered () возвращает TRUE, но больше нет действующего соединения кластера.Но опять же, это хак, и я понятия не имею, работает ли он с другими бэкэндами или даже с другими типами кластерных типов (в основном я использую сокеты).

3 голосов
/ 29 июля 2011

В обратном порядке задаваемых вопросов:

  1. @ Joris прав в отношении проверки зарегистрированного параллельного бэкэнда. Однако обратите внимание, что существует разница между машиной, являющейся одноядерной, и тем, зарегистрирован ли параллельный бэкэнд. Проверка количества ядер является очень специфической задачей для платформы (операционной системы). В Linux это может работать для вас:

    CountUnixCPUs  <- function(cpuinfo = "/proc/cpuinfo"){
    tmpCmd  <- paste("grep processor ", cpuinfo, " | wc -l", sep = "")
    numCPU  <- as.numeric(system(tmpCmd, intern = TRUE))
    return(numCPU)
    }
    

    Редактировать: см. Ссылку @ Joris на другую страницу ниже, где приведены рекомендации для Windows и Linux. Я, вероятно, перепишу свой собственный код, по крайней мере, чтобы включить больше опций для подсчета ядер.

  2. Что касается вложенных циклов, я беру другую тактику: я готовлю таблицу параметров, а затем перебираю строки. Очень простой способ, например ::

    library(Matrix)
    ptable <- which(triu(matrix(1, ncol = 20, nrow = 20))==1, arr.ind = TRUE)
    foreach(ix_row = 1:nrow(ptable)) %dopar% { myFunction(ptable[ix_row,])}
    
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...