распараллелить вызов старым проектам R с использованием исходного кода - PullRequest
0 голосов
/ 23 октября 2019

Я в процессе обновления старых проектов R. В этом процессе мне нужно выполнить старые проекты и новые проекты.

Мне удалось запустить старые проекты последовательно (один за другим), и я пытаюсь сделать то же самое параллельно.

Эти сценарии не возвращают никакого значения, поскольку записывают в базу данных.

Это рабочий код

execute_script <- function(script_name, folder_path, required_data){
  #' This function prepares the input for a folder and a given script
  #' @param script_name: Contains the name of the executable .R script to be executed
  #' @param folder_path: Contains the folder where the script to executed is stored
  #' @param required_data: This contains the data required to run the script. 
  #' @return when finished returns TRUE to ensure it has ended 

  # I need to do this to join path on server with filename (it depends on user selection)
  script_to_execute <- paste0(folder_path,"\\", script_name)
  script_to_execute <- paste0((unlist(strsplit(script_to_execute, split='\\', fixed = TRUE))), collapse = "/")

  tryCatch( 
    expr = {    
      source(script_to_execute, local = TRUE, echo = TRUE, print.eval = TRUE, spaced = TRUE, chdir = TRUE)
    }, 
    error = function(e){

      print(paste(Sys.time(),"Excepcion execute_script: ",script_name))
      print(paste(Sys.time(),"Excepcion execute_script: ",e))
      return(FALSE)      
    }
  )
  return(TRUE) # TODO chech
}

 model_execution_sequence <- function(script_required_data, selected_scripts){
  #' Executes the scripts received in selected_projects using script_required_data
  #' @param script_required_data contains a dataframe with the input
  #' 
  #' @param selected_scripts is a datatable that contains all the information required to execute a script
  #'                    
  #' @return a boolean information to determine if execution was success or not 


  TYPE_1_MODEL <- selected_scripts[APPROACH == '1']
  if (nrow(TYPE_1_MODEL) > 0) {


    my_iters <- 1:dim(TYPE_1_MODEL)[1]
    for (x in my_iters) {
      cat("\n\n")
      a <- execute_script(TYPE_1_MODEL$FILE[x],
                            TYPE_1_MODEL$PATH[x],
                            script_required_data)

      print(paste(Sys.time(),"Executed",TYPE_1_MODEL$FILE[x],"result:",a))
    }

  }else{
    print(paste("Error: Unexpected approach named: ",selected_scripts," "))

  }
  return(TRUE)
}

Это код, который я тестировал с распараллеливанием

model_execution_foreach <- function(script_required_data, selected_models){
  #' Executes the scripts received in selected_projects using script_required_data
  #' @param script_required_data contains a dataframe with the input
  #' 
  #' @param selected_scripts is a datatable that contains all the information required to execute a script
  #'                    
  #' @return a boolean information to determine if execution was success or not 

  TYPE_1_MODEL <- selected_scripts[TYPE == '1']

  if (nrow(TYPE_1_MODEL) > 0) {
    TYPE_1_MODEL <- distinct(TYPE_1_MODEL)


    no_cores <- detectCores() - 1
    cl <- makeCluster(no_cores, type = "PSOCK")

    foreach (i = 1:dim(TYPE_1_MODEL)[1] ) %dopar%{
      ml<- data.frame(execute_script(TYPE_1_MODEL$FILE[i], TYPE_1_MODEL$PATH[i], script_required_data))
    }
    stopCluster(cl)

  }else{
    print(paste("Error: Unexpected TYPE named: ",selected_scripts))

  }
  return(TRUE)
}

Причина, по которой он не работает

  1. Скрипт не ломается (я помещаю его в tryCatch)
  2. Каждый скрипт должен записывать данные в базу данных, и они не являются

Я перешел по таким ссылкам, как: функция распараллеливания

Параллелизм в R

Заранее спасибо

1 Ответ

0 голосов
/ 25 октября 2019

Мне удалось решить часть проблемы.

Вместо использования foreach я использовал parLapply следующим образом:

models_names<- list("TYPE_1", "TYPE_2", "TYPE_3", "TYPE_4" )
no_cores <- detectCores() - 1
cl <- makeCluster(no_cores)
registerDoParallel(cl)
clusterExport(cl, varlist =  vars_list <-
                as.vector(ls()), envir = environment())
input <- as.data.table(data)


tryCatch({
  RESULTS <- as.data.frame(parLapply(cl, models_names, function(x) {
    source(
      BU_available_paths[[x]],
      local = TRUE,
      echo = TRUE,
      print.eval = TRUE,
      spaced = TRUE,
      chdir = TRUE
    )
    return(TRUE)
  }))
},
error = function(e) {
  stopCluster(cl)




},
finally = {
  stopCluster(cl)
  sink(log_path_main, type = "output", append = TRUE)
  print("Exit from Parallelization call ")
})


    stopCluster(cl)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...