Вложенная параллельная обработка с условной логической ошибкой - PullRequest
0 голосов
/ 12 ноября 2018

Это немного сложно, так что я не думаю, что было бы целесообразно поделиться точным кодом, с которым я работаю, но я должен быть в состоянии достаточно хорошо разобраться с этим с помощью псевдокода:

Немного предыстории: по сути, я пытаюсь выполнять параллельные вычисления по вложенному циклу операций.У меня есть две большие функции, первая должна запускаться и возвращать TRUE для запуска второй функции, и если вторая функция запускается, она должна пройти через несколько итераций.Теперь это вложенный цикл, потому что мне нужно выполнить всю вышеуказанную операцию несколько раз для разных сценариев.Ниже указан псевдокод, который я пытаюсь использовать:

Output <- foreach(1 to “I”, .packages=packages, .combine=rbind) %:%  
    Run the first function  
    If the first function is false:  
        Print and record  
    Else:  
        Foreach(1 to J, .packages=packages, .combine=rbind) %dopar%{  
            Run the second function  
            Create df summarizing each loop of second function  
        }  

Вот упрощенная версия того, что я пытаюсь сделать, и ошибка, с которой я сталкиваюсь:

library(doParallel)
library(foreach)
func1 <- function(int1){
  results <- list(int1,TRUE)
  return(results)
}
func2 <- function(int2){
  return(int1/int2)
}

int1list <- seq(1,10)
int2list <- seq(1,15)

out <- foreach(i=1:length(int1list),.combine=rbind) %:%
  out1 <- func1(i)
  if(out1[[2]]==FALSE){
    print("fail")
    next
  } else{
    foreach(j=1:length(int2),.combine=rbind) %dopar% {
      int3 <- func2(j)
      data.frame("Scenario"=i,"Result"=int3)
    }
  }

Ошибка:Ошибка в func1 (i): объект 'i' не найден

Когда я запускаю вышеизложенное, это, по сути, говорит мне, что он даже не может найти объект «I», что, как я полагаю, происходит, потому что язапуск вещей, которые называют «я», за пределами самого внутреннего цикла.У меня была возможность заставить работать вложенные параллельные циклы раньше, но у меня не было ничего, что нужно было запускать вне внутреннего цикла, поэтому я предполагаю, что это проблема с пакетом, не знающим порядок выполнения вещей.

У меня есть обходной путь, при котором я могу просто запустить первую функцию параллельно, а затем запустить вторую функцию параллельно на основе результатов первого цикла (по сути, двух отдельных циклов вместо вложенного цикла), но я былИнтересно, есть ли способ заставить работать что-то вроде вложенного цикла, потому что я думаю, что это будет более эффективно.При запуске в рабочем режиме этот код, вероятно, займет несколько часов, поэтому стоит сэкономить время.

Ответы [ 2 ]

0 голосов
/ 13 ноября 2018

Я ценю помощь, оказанную r2evans, хотя я на самом деле не смог воспроизвести его работу из-за своей неопытности и неспособности выяснить, как заставить ncat работать на моем компьютере, он помог мне понять, что мой оригинальный метод не сможет Он работает так же, как и на два отдельных параллельных цикла foreach, которые я получил в рабочую рабочую версию на данный момент.

Это оригинальное предлагаемое решение:

library(doParallel)
library(foreach)

cl <- makeCluster(detectCores())
registerDoParallel(cl)

func1 <- function(int1){
  results <- list(int1,int1>2)
  return(results)
}
func2 <- function(int1,int2){
  return(int1/int2)
}
int1list <- seq(1,3)
int2list <- seq(1,5)
out <- foreach(i=1:length(int1list),.combine=rbind) %do% {
  out1 <- func1(int1list[i])
  if(!out1[[2]]){
    data.frame("Scenario"=i, "Result"=out1[[1]], UsedJ=FALSE)
    # next
  } else{
    foreach(j=1:length(int2list),.combine=rbind) %dopar% {
      int3 <- func2(out1[[1]], int2list[j])
      data.frame("Scenario"=i,"Result"=int3, UsedJ=TRUE)
    }
  }
}

stopCluster(cl)
registerDoSEQ()

out

Однако, это приводит к циклу, который ожидает завершения первой итерации func2 итераций func1 перед началом второй и итераций func1. Я решил разделить это на две отдельные петли, как показано ниже:

library(doParallel)
library(foreach)

cl <- makeCluster(detectCores())
registerDoParallel(cl)

func1 <- function(int1){
  results <- list(int1,int1>2)
  return(results)
}
func2 <- function(int1,int2){
  return(int1/int2)
}
int1list <- seq(1,3)
int2list <- seq(1,5)

out1 <- foreach(i=1:length(int1list)) %dopar%{
  func1(i)
}

finalOut <- data.frame("Scenario"=integer(),"UsedJ"=logical(),"Result"=double())

for (i in 1:length(int1list)){
  if(out1[[2]]==FALSE){
    tempOut <- data.frame("Scenario"=i,"UsedJ"=FALSE,"Result"=NA)
  } else{
    tempOutput <- foreach(j=1:length(int2list),.combine=rbind) %dopar% {
      Result <- func2(i,j)
      data.frame("Scenario"=i,"UsedJ"=TRUE,"Result"=Result)
    }
  }
}

stopCluster(cl)
registerDoSEQ()

finalOut

Этот алгоритм, похоже, подходит для моих целей. Он не так эффективен, как мог бы, но должен выполнять свою работу и не быть слишком расточительным.

0 голосов
/ 12 ноября 2018

Я не профессионал в foreach, но есть несколько вещей, которые выделяются:

  • func2 ссылается на int1 и int2, но оно дается только последним; это может быть артефактом вашего упрощенного примера, а может и нет?
  • ваш код должен быть заключен в фигурный блок, т. Е. Вам нужно перейти с

    out <- foreach(i=1:length(int1list),.combine=rbind) %:%
      out1 <- func1(i)
      if(out1[[2]]==FALSE) ...
    

    до

    out <- foreach(i=1:length(int1list),.combine=rbind) %:% {
      out1 <- func1(i)
      if(out1[[2]]==FALSE) ...
    }
    
  • документы для foreach предполагают, что бинарный оператор %:% является оператором вложенности, который используется между двумя foreach вызовами, но вы этого не делаете. Я думаю, что он работает правильно с %do% (или %dopar%)
  • Я не думаю, что print хорошо работают в параллельных циклах foreach ... это может работать, найти на главном узле, но не на всех остальных, ref: Как мне печатать при использовании% dopar %
  • возможно, снова из-за упрощенного примера, вы определяете, но на самом деле не используете содержимое int1list (только его длина), я исправлю в этом примере
  • next работает в "нормальных" R-циклах, а не в этих специализированных foreach -петлях; это не проблема, поскольку ваша if / else структура обеспечивает тот же эффект

Вот ваш пример, слегка измененный для учета всего вышеперечисленного. Я добавляю UsedJ, чтобы указать

library(doParallel)
library(foreach)
func1 <- function(int1){
  results <- list(int1,int1>2)
  return(results)
}
func2 <- function(int1,int2){
  return(int1/int2)
}
int1list <- seq(1,3)
int2list <- seq(1,5)
out <- foreach(i=1:length(int1list),.combine=rbind) %do% {
  out1 <- func1(int1list[i])
  if(!out1[[2]]){
    data.frame("Scenario"=i, "Result"=out1[[1]], UsedJ=FALSE)
    # next
  } else{
    foreach(j=1:length(int2list),.combine=rbind) %dopar% {
      int3 <- func2(out1[[1]], int2list[j])
      data.frame("Scenario"=i,"Result"=int3, UsedJ=TRUE)
    }
  }
}
out
#   Scenario Result UsedJ
# 1        1   1.00 FALSE
# 2        2   2.00 FALSE
# 3        3   3.00  TRUE
# 4        3   1.50  TRUE
# 5        3   1.00  TRUE
# 6        3   0.75  TRUE
# 7        3   0.60  TRUE

Редактировать

Если вы не видите распараллеливания, возможно, это потому, что вы еще не настроили «кластер». Есть также несколько других изменений в рабочем процессе, чтобы заставить его хорошо распараллеливаться, основанный на методе вложенных циклов foreach с оператором %:%.

Чтобы «доказать», что это работает параллельно, я добавил несколько журналов, основанных на Как я могу печатать при использовании% dopar% (потому что параллельные процессы не print, как можно было бы надеюсь).

library(doParallel)
library(foreach)
Log <- function(text, ..., .port = 4000, .sock = make.socket(port=.port)) {
  msg <- sprintf(paste0(as.character(Sys.time()), ": ", text, "\n"), ...)
  write.socket(.sock, msg)
  close.socket(.sock)
}
func1 <- function(int1) {
  Log(paste("func1", int1))
  Sys.sleep(5)
  results <- list(int1, int1 > 2)
  return(results)
}
func2 <- function(int1, int2) {
  Log(paste("func2", int1, int2))
  Sys.sleep(1)
  return(int1 / int2)
}

Использование кода регистрации требует внешнего способа чтения из этого сокета. Я использую netcat (nc или Nmap's ncat) с ncat -k -l 4000 здесь. Это, конечно, не требуется для работы, но здесь удобно посмотреть, как идут дела. (Примечание: этот слушатель / сервер должен работать до того, как вы попытаетесь использовать Log.)

Мне не удалось заставить вложенные "foreach -> func1 -> foreach -> func2" правильно распараллелить func2. Исходя из снов, это должно занять 5 секунд для трех вызовов до func1 и 2 секунды (две партии по три каждого) для пяти вызовов до func2, но это займет 10 секунд (три параллельных вызова до * 1075) *, затем пять последовательных вызовов func2):

system.time(
  out <- foreach(i=1:length(int1list), .combine=rbind, .packages="foreach") %dopar% {
    out1 <- func1(int1list[i])
    if (!out1[[2]]) {
      data.frame(Scenario=i, Result=out1[[1]], UsedJ=FALSE)
    } else {
      foreach(j=1:length(int2list), .combine=rbind) %dopar% {
        int3 <- func2(out1[[1]], int2list[j])
        data.frame(Scenario=i, Result=int3, UsedJ=TRUE)
      }
    }
  }
)
#    user  system elapsed 
#    0.02    0.00   10.09 

с соответствующим выходом консоли:

2018-11-12 11:51:17: func1 2
2018-11-12 11:51:17: func1 1
2018-11-12 11:51:17: func1 3
2018-11-12 11:51:23: func2 3 1
2018-11-12 11:51:24: func2 3 2
2018-11-12 11:51:25: func2 3 3
2018-11-12 11:51:26: func2 3 4
2018-11-12 11:51:27: func2 3 5

(обратите внимание, что заказ не гарантируется.)

Итак, мы можем разбить его на вычислительные func1 вещи:

system.time(
  out1 <- foreach(i = seq_along(int1list)) %dopar% {
    func1(int1list[i])
  }
)
#    user  system elapsed 
#    0.02    0.01    5.03 
str(out1)
# List of 3
#  $ :List of 2
#   ..$ : int 1
#   ..$ : logi FALSE
#  $ :List of 2
#   ..$ : int 2
#   ..$ : logi FALSE
#  $ :List of 2
#   ..$ : int 3
#   ..$ : logi TRUE

Консоль

2018-11-12 11:53:21: func1 2
2018-11-12 11:53:21: func1 1
2018-11-12 11:53:21: func1 3

затем работайте над func2 материалом:

system.time(
  out2 <- foreach(i = seq_along(int1list), .combine="rbind") %:%
    foreach(j = seq_along(int2list), .combine="rbind") %dopar% {
      Log(paste("preparing", i, j))
      if (out1[[i]][[2]]) {
        int3 <- func2(out1[[i]][[1]], j)
        data.frame(i=i, j=j, Result=int3, UsedJ=FALSE)
      } else if (j == 1L) {
        data.frame(i=i, j=NA_integer_, Result=out1[[i]][[1]], UsedJ=FALSE)
      }
    }
)
#    user  system elapsed 
#    0.03    0.00    2.05 
out2
#   i  j Result UsedJ
# 1 1 NA   1.00 FALSE
# 2 2 NA   2.00 FALSE
# 3 3  1   3.00 FALSE
# 4 3  2   1.50 FALSE
# 5 3  3   1.00 FALSE
# 6 3  4   0.75 FALSE
# 7 3  5   0.60 FALSE

Две секунды (первая партия из трех - 1 секунда, вторая партия из двух - 1 секунда) - это то, что я ожидал. Консоль:

2018-11-12 11:54:01: preparing 1 2
2018-11-12 11:54:01: preparing 1 3
2018-11-12 11:54:01: preparing 1 1
2018-11-12 11:54:01: preparing 1 4
2018-11-12 11:54:01: preparing 1 5
2018-11-12 11:54:01: preparing 2 1
2018-11-12 11:54:01: preparing 2 2
2018-11-12 11:54:01: preparing 2 3
2018-11-12 11:54:01: preparing 2 4
2018-11-12 11:54:01: preparing 2 5
2018-11-12 11:54:01: preparing 3 1
2018-11-12 11:54:01: preparing 3 2
2018-11-12 11:54:01: func2 3 1
2018-11-12 11:54:01: preparing 3 3
2018-11-12 11:54:01: func2 3 2
2018-11-12 11:54:01: func2 3 3
2018-11-12 11:54:02: preparing 3 4
2018-11-12 11:54:02: preparing 3 5
2018-11-12 11:54:02: func2 3 4
2018-11-12 11:54:02: func2 3 5

Вы видите, что func2 вызывается правильно пять раз. К сожалению, вы видите, что внутри цикла происходит большое "вращение". Конечно, это фактически не работает (о чем свидетельствует время выполнения 2.05 секунды), поэтому нагрузка на узлы незначительна.

Если у кого-то есть способ предотвратить это ненужное вращение, я приветствую комментарии или «конкурирующие» ответы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...