R doParallel Индикатор выполнения для мониторинга выполненных работ - PullRequest
4 голосов
/ 20 октября 2019

Я пытаюсь написать пакет CRAN с многопоточными возможностями. Я достиг идеального решения с doSNOW, но команда CRAN пометила пакет как «замененный», и они попросили меня перейти на решение doParallel. Это нормально, однако я не смог найти способ отслеживать количество выполненных заданий, используя doParallel так же, как я делал с doSNOW. Вот мое doSNOW решение:

# Set up parameters
nthreads<-2
nreps<-100
funrep<-function(i){
    Sys.sleep(0.1)
    res<-c(log2(i),log10(i))
    return(res)
}
# doSNOW solution
library(doSNOW)
cl<-makeCluster(nthreads)
registerDoSNOW(cl)
pb<-txtProgressBar(0,nreps,style=3)
progress<-function(n){
    setTxtProgressBar(pb,n)
}
opts<-list(progress=progress)
i<-0
output<-foreach(i=icount(nreps),.combine=c,.options.snow=opts) %dopar% {
    s<-funrep(i)
    return(s)
}
close(pb)
stopCluster(cl)

А вот решение doParallel, как предлагалось в предыдущем сообщении переполнения стека . Однако, как вы можете видеть, он не печатает прогресс по мере выполнения заданий, он просто делает это, когда результаты объединяются, в самом конце.

# doParallel solution
library(doParallel)
progcombine<-function(){
  count<-0
  function(...) {
    count<<-count+length(list(...))
    setTxtProgressBar(pb,count)
    utils::flush.console()
    c(...)
  }
}
cl <- makeCluster(nthreads)
registerDoParallel(cl)
output<-foreach(i = icount(nreps),.combine=progcombine()) %dopar% {
    funrep(i)
}
stopCluster(cl)

Можете ли вы предложить мне решение дляконтролировать завершение состояния задания, используя doParallel или, по крайней мере, без использования замененного doSNOW? Возможно, с индикатором выполнения, а также возможно с поддержкой нескольких ОС. Большое спасибо!

Ответы [ 3 ]

2 голосов
/ 20 октября 2019

Я не смог найти решение с doParallel (я не думаю, что он поддерживает индикаторы выполнения для завершения задания), но, возможно, вы можете попробовать новый pbabbly :

# pbapply solution
library(pbapply)
cl<-parallel::makeCluster(nthreads)
invisible(parallel::clusterExport(cl=cl,varlist=c("nreps")))
invisible(parallel::clusterEvalQ(cl=cl,library(utils)))
result<-pblapply(cl=cl,
                 X=1:nreps,
                 FUN=funrep)
parallel::stopCluster(cl)
1 голос
/ 21 октября 2019

(отказ от ответственности: я являюсь автором пакета progressr и будущей платформы)

Пакет progressr (в настоящее время доступен только на GitHub) может достичь этого при использовании doFuture в качестве параллельного бэкэнда для foreach :

library(progressr) ## use progressr for procession updates
library(doFuture)  ## attaches also foreach and future
registerDoFuture() ## tell foreach to use futures
plan(multisession) ## parallelize over a local PSOCK cluster

xs <- 1:5

with_progress({
  p <- progressor(along = xs) ## create a 5-step progressor
  y <- foreach(x = xs) %dopar% {
    p()                       ## signal a progression update
    Sys.sleep(6.0-x)
    sqrt(x)
  }
})

По умолчанию используется utils::txtProgressBar() для отчетов о прогрессии, но вы можете изменить это. Например, следующее будет сообщать об обновлениях прогрессии через progress::progress_bar() и beepr::beep():

progressr::handler("progress", "beepr")

Вы также можете добавлять сообщения для каждого обновления прогрессии, например,

p(sprintf("x=%g", x))

FYI, plan(multisession, workers = 2) - это сокращение от plan(cluster, workers = cl), где cl в основном cl <- parallel::makeCluster(2L).

PS. Цель пакета Progressr - предоставить минимальный, устойчивый, расширяемый и унифицированный API для обновлений прогресса. Это, хотя и инвариантно к тому, какой итератор каркас используется.

PPS. API прогрессера находится в стадии разработки;может пройти некоторое время, прежде чем он определит свое истинное «я».

1 голос
/ 20 октября 2019

Решение с использованием surveillance::plapply. Обработка индикатора выполнения реализована в функции для распараллеливания.

funrep2 <- function(i){
  i <<- i + 1
  setTxtProgressBar(pb, i)
  res <- c(log2(i), log10(i))
  Sys.sleep(0.1)
  return(res)
}

После экспорта объектов в кластеры обработка до сих пор аналогична вашему parallel решению:

library(parallel)
pb <- txtProgressBar(max=nreps, style=3)
cl <- makeCluster(nthreads)
clusterExport(cl, c("pb", "funrep2"), envir=environment())
clusterEvalQ(cl, library(surveillance))
i <- 0
output <- surveillance::plapply(1:nreps, "funrep2")
stopCluster(cl)
close(pb)

# |=============================================================================...     |  85%
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...