Почему «foreach» не экспортируется в мой изменяемый объект при параллельной работе? - PullRequest
0 голосов
/ 10 апреля 2019

Я устанавливаю имитационную модель уровня пациента в R. Это требует создания двух фреймов данных для каждого пациента (с лечением и без) с течением времени (с использованием двух внутренних циклов). Затем я должен зациклить внутренние петли для каждого пациента, который требуется в модели. Результаты внутренних циклов затем сохраняются в списке в глобальной среде.

Чтобы попытаться ускорить процесс, я хочу запустить внешние циклы параллельно, используя пакет foreach. Цикл работает должным образом при использовании %do% (без параллельного запуска циклов). Однако, как только я установил значение %dopar% для параллельной работы, внутренние циклы больше не экспортируются в список в глобальной среде, и я получаю сообщение об ошибке:

Ошибка в {: задача 1 не выполнена - «объект« Данные пациента »не найден»

Я предоставил код ниже, в котором есть рабочий пример %do% и %dopar% версий моей функции внешнего цикла. Внутренние циклы были удалены из примера и просто заменены простыми вероятностными розыгрышами.

Любая помощь будет принята с благодарностью.


library(tidyverse)
library(foreach)
library(doSNOW)

# Input
rm(list = ls())
Patient_Number <- 1000

#### Create a place to store patient data generated during the simulation ####

Patient_Data <- vector("list", length = Patient_Number)


#### Function - Non-parallel ####

Run_Sim <- function(){

  cl <- makeCluster(4, type = "SOCK")
  registerDoSNOW(cl)

  # record the time the model started

  model_start <- Sys.time()

  print(noquote(paste("Time model started: ", format(Sys.time(), "%a %d %b %Y %X"), sep = "")))

  #### Simulate Patient's BCVA scores ####

  # create progress bar

  print(noquote("Simulating Patients:"))

  pb <- txtProgressBar(min = 0, max = Patient_Number, style = 3)
  progress <- function(n) setTxtProgressBar(pb, n)
  opts <- list(progress = progress)

  foreach(i = 1:Patient_Number, .packages = c("tidyverse"), .inorder = FALSE,
          .export = ls(globalenv()),
          .options.snow = opts) %do% {

            This_Patient <- list(
              Patient_ID = 0,
              Intervention = 0,
              Comparator = 0
            )

            This_Patient_Draw_Int <- rnorm(1, mean = 50, sd = 7.8) # These normally would be more complex functions generating a data frame for each patient
            This_Patient_Draw_Comp <- rnorm(1, mean = 44, sd = 10) # These normally would be more complex functions generating a data frame for each patient

            This_Patient$Patient_ID <- i
            This_Patient$Intervention <- This_Patient_Draw_Int
            This_Patient$Comparator <- This_Patient_Draw_Comp

            Patient_Data[[i]] <<- This_Patient

          }

  # stop the progress bar

  close(pb)

  # record when model finished

  model_finish <- Sys.time()
  print(noquote(paste("Time model finished: ", format(Sys.time(), "%a %d %b %Y %X"), sep = "")))

  print(noquote(paste("Model took ", round(difftime(model_finish, model_start, units = c("mins")), 0),
                      " minute(s) to simulate ", Patient_Number, " Patients", sep = "")))

  stopCluster(cl)

}

Run_Sim()


#### Parallel version using foreach %dopar% ####

rm(list = ls())
Patient_Number <- 1000
Patient_Data <- vector("list", length = Patient_Number)

Run_Sim_Para <- function(){

  cl <- makeCluster(4, type = "SOCK")
  registerDoSNOW(cl)

  # record the time the model started

  model_start <- Sys.time()

  print(noquote(paste("Time model started: ", format(Sys.time(), "%a %d %b %Y %X"), sep = "")))

  #### Simulate Patient's BCVA scores ####

  # create progress bar

  print(noquote("Simulating Patients:"))

  pb <- txtProgressBar(min = 0, max = Patient_Number, style = 3)
  progress <- function(n) setTxtProgressBar(pb, n)
  opts <- list(progress = progress)

  foreach(i = 1:Patient_Number, .packages = c("tidyverse"), .inorder = FALSE,
          .export = ls(globalenv()),
          .options.snow = opts) %dopar% {

            This_Patient <- list(
              Patient_ID = 0,
              Intervention = 0,
              Comparator = 0
            )

            This_Patient_Draw_Int <- rnorm(1, mean = 50, sd = 7.8) # These normally would be more complex functions generating a data frame for each patient
            This_Patient_Draw_Comp <- rnorm(1, mean = 44, sd = 10) # These normally would be more complex functions generating a data frame for each patient

            This_Patient$Patient_ID <- i
            This_Patient$Intervention <- This_Patient_Draw_Int
            This_Patient$Comparator <- This_Patient_Draw_Comp

            Patient_Data[[i]] <<- This_Patient

          }

  # stop the progress bar

  close(pb)

  # record when model finished

  model_finish <- Sys.time()
  print(noquote(paste("Time model finished: ", format(Sys.time(), "%a %d %b %Y %X"), sep = "")))

  print(noquote(paste("Model took ", round(difftime(model_finish, model_start, units = c("mins")), 0),
                      " minute(s) to simulate ", Patient_Number, " Patients", sep = "")))

  stopCluster(cl)

}

Run_Sim_Para()

1 Ответ

0 голосов
/ 11 апреля 2019

Я решил проблему, выполнив следующее:

  1. Создание отдельной функции, которая компилирует внутренние циклы в список
  2. Эта функция списка затем передается функции foreach
  3. Вместо использования изменяемых состояний для обновления уже существующего списка в глобальной среде, функция assign используется для передачи вывода цикла foreach объекту "Patient_Data" в глобальной среде

Пример кода ниже. Надеюсь, что это поможет другим, кто может столкнуться с подобной проблемой.

library(tidyverse)
library(foreach)
library(doSNOW)

# Input
rm(list = ls())
Patient_Number <- 1e4

#### Create a listing function which will be ran through "foreach" ####

list_func <- function(Patient_ID_Code){

  This_Patient <- list(
    Patient_ID = 0,
    Intervention = 0,
    Comparator = 0
  )

  This_Patient_Draw_Int <- rnorm(1, mean = 50, sd = 7.8) # These normally would be more complex functions generating a data frame for each patient
  This_Patient_Draw_Comp <- rnorm(1, mean = 44, sd = 10) # These normally would be more complex functions generating a data frame for each patient

  This_Patient$Patient_ID <- Patient_ID_Code
  This_Patient$Intervention <- This_Patient_Draw_Int
  This_Patient$Comparator <- This_Patient_Draw_Comp

  return(This_Patient)


}


Run_Sim_Para <- function(){

  cl <- parallel::makeCluster(parallel::detectCores() - 1)
  registerDoSNOW(cl)

  # record the time the model started

  model_start <- Sys.time()

  print(noquote(paste("Time model started: ", format(Sys.time(), "%a %d %b %Y %X"), sep = "")))

  #### Simulate Patient's BCVA scores ####

  # create progress bar

  print(noquote("Simulating Patients:"))

  pb <- txtProgressBar(min = 0, max = Patient_Number, style = 3)
  progress <- function(n) setTxtProgressBar(pb, n)
  opts <- list(progress = progress)

  test <- foreach(i = 1:Patient_Number, .packages = c("tidyverse"),
                  .export = ls(.GlobalEnv),
                  .options.snow = opts) %dopar% {

                    list_func(i)

                  }

  # stop the progress bar

  close(pb)

  # record when model finished

  model_finish <- Sys.time()
  print(noquote(paste("Time model finished: ", format(Sys.time(), "%a %d %b %Y %X"), sep = "")))

  print(noquote(paste("Model took ", round(difftime(model_finish, model_start, units = c("mins")), 0),
                      " minute(s) to simulate ", Patient_Number, " Patients", sep = "")))

  stopCluster(cl)

  assign("Patient_Data", test, envir = .GlobalEnv)

}

Run_Sim_Para()
...