Слияние списка в параллельном прогоне в R - PullRequest
0 голосов
/ 21 мая 2018

У меня есть функция, которую я буду повторять 40000 раз, я хочу сделать ее параллельной, пробовал с библиотекой doParallel с определенным размером пакета 200.

Вывод находится в списке, и я хочу накопитьсуммировать поэлементно.Каждый объект в выводе довольно большой, поэтому я должен разделить его на пакеты.

Однако время выполнения становится очень медленным, так как я запускаю более одного пакета, 1 отдельный пакет занимает 11 с.время, 10 партий занимают 160 секунд.Кто-нибудь имеет представление о том, что я делаю неправильно.

library(doParallel)
myCluster <- makeCluster(3, # number of cores to use
                         type = "PSOCK") # type of cluster
registerDoParallel(myCluster)
for(i in 1:1){
    result = foreach(j=((i-1)*batch_size+1):(i*batch_size)) %dopar% {
        some_function(input_data[j,])
    }
    result_batch_tmp <- Reduce("+",result)
    if(i==1) result_batch <- list(result_batch_tmp) 
    else result_batch <- c(result_batch,result_batch2)
    rm(result)
    rm(result_batch_tmp) 
}
stopCluster(myCluster)

Ответы [ 2 ]

0 голосов
/ 22 мая 2018

Я удалил большую часть кода, но то, что я прикрепляю, вызывает ту же проблему, оно становится намного медленнее, когда я увеличиваю количество прогонов

step_calc_date <- function(calc_date){
  if(substr(calc_date,5,6)==12) return(floor(calc_date/10000)*10000+10101)
  else return(calc_date+100)
}


some_function <- function(input){
  calc_date<- 20180401
  t=0
  for(i in 1:1400){
    t=t+1
    calc_date=step_calc_date(calc_date)#step 1 period
    state <- t(c(0,0,0,0,0,0,0,0,0,1))
    state_prev_period = state
  }
  BE_period = rep(1,120*12+1)
  rm(input)
  return(BE_period[1])
}
0 голосов
/ 21 мая 2018

Что бы я сделал:

Данные

input_data <- as.matrix(iris[rep(1:150, 1000), rep(1:4, 100)])
dimnames(input_data) <- list(NULL, NULL)
batch_size <- 1e3
some_function <- mean

Вычисления

library(doParallel)
myCluster <- makeCluster(3, # number of cores to use
                         type = "PSOCK") # type of cluster
registerDoParallel(myCluster)

batchs <- 1

# Your solution
system.time({
  result_all <- foreach(i = seq_len(batchs)) %do% {
    ind.batch  <- ((i-1)*batch_size+1):(i*batch_size)
    result <- foreach(j = ind.batch) %dopar% {
      some_function(input_data[j, ])
    }
    Reduce("+", result)
  }
  result_all <- Reduce("+", result_all)
})

# My solution
system.time({
  result_all <- foreach(i = seq_len(batchs)) %do% {
    ind.batch  <- ((i-1)*batch_size+1):(i*batch_size)
    data.batch <- input_data[ind.batch, ]
    result <- foreach(j = seq_along(ind.batch)) %dopar% {
      some_function(data.batch[j, ])
    }
    Reduce("+", result)
  }
  result_all <- Reduce("+", result_all)
})

stopCluster(myCluster)

Время:

  • 1 -> 0,35 против3,5
  • 10 -> 3,9 против 35,9
  • 30 -> 11,3 против 106

Объяснение:

Не удалось полностью воспроизвести вашу проблемуно я думаю, что если вы используете input_data[j,] в foreach, вы скопируете все данные в каждый кластер.Вот почему я создаю data.batch, а затем применяю foreach -параллельный цикл.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...