Использование стандартного блестящего индикатора R в параллельных вычислениях foreach - PullRequest
0 голосов
/ 05 февраля 2019

Я пытаюсь использовать стандартный блестящий индикатор R в параллельном цикле foreach с помощью серверной части doParallel.Однако это приводит к следующему сообщению об ошибке:

Предупреждение: ошибка в {: задача 1 не выполнена - "сеанс" не является объектом ShinySession. "

Код(минимальный рабочий пример)

library(shiny)
library(doParallel)

ui <- fluidPage(
  actionButton(inputId = "go", label = "Launch calculation")
)

server <- function(input, output, session) {

  workers=makeCluster(2)
  registerDoParallel(workers)

  observeEvent(input$go, {
    Runs=c(1:4)
    Test_out=foreach(i=Runs, .combine=cbind, .inorder=TRUE, .packages=c("shiny"),.export=c("session")) %dopar% { 
      pbShiny = shiny::Progress$new()
      pbShiny <- Progress$new(session,min = 0, max = 10)
      on.exit(pbShiny$close())
      test_vec=rep(0,100)

      for(i in 1:10){
        test_vec=test_vec+rnorm(100)
        pbShiny$set(message="Simulating",detail=paste(i),
                  value=i)
        Sys.sleep(0.2)
      }

    }
  })
}

shinyApp(ui = ui, server = server)

Код запускается, если я последовательно запускаю цикл foreach (используя registerDoSEQ ()).Кто-нибудь знает, как решить эту проблему?


Общая цель

  • Показать прогресс для пользователя в параллельном цикле foreach с использованием фоновой части doParallel в блестящем
  • Пользователь должен знать о количестве работников и прогрессе на одного работника и / или общем прогрессе

По следующей ссылке есть похожий вопрос, но он не был решен какрабочий пример не был предоставлен:

Использование параллельного foreach для индикатора выполнения в R Shiny

Ответы [ 2 ]

0 голосов
/ 11 февраля 2019

Я думаю, что нашел решение для случаев, когда количество прогонов превышает количество ядер.

Я искал вложенные будущие процессы и нашел следующую страницу:

https://cran.r -project.org / web / packages / future / vignettes / future-3-topologies.html

Я изменил свой код следующим образом.Это запускает задания последовательно для каждого ядра и соответственно обновляет соответствующие индикаторы выполнения.

library(shiny)
library(future)
library(promises)
library(ipc)
library(listenv)

plan(list(multiprocess, sequential))

ui <- fluidPage(
  actionButton(inputId = "go", label = "Launch calculation")
)

server <- function(input, output, session) {

  observeEvent(input$go, {
    x <- listenv()
    Runs=12 #define the number of runs
    N=availableCores()
    Tasks=rep(0,N) #Number of sequential tasks per core
    Tasks[1:(Runs-(ceiling(Runs/N)-1)*N)]=ceiling(Runs/N)
    if((Runs-(ceiling(Runs/N)-1)*N)<N){
      Tasks[(Runs-(ceiling(Runs/N)-1)*N+1):N]=ceiling(Runs/N)-1
    }

    progress = list() #A list to maintain progress for each run

    for(j in 1:N){

      for(l in 1:Tasks[j]){
        progress[[(l-1)*N+j]] = AsyncProgress$new(message=paste("Complex analysis, core ",j," , task ",l))
      }

    x[[j]]%<-%{
      for(l in 1:Tasks[j]){
        for (i in 1:10) {
          progress[[(l-1)*N+j]]$inc(1/10)
          Sys.sleep(0.5)
        }
        progress[[(l-1)*N+j]]$close()
      }
    }
    }

    NULL
  })
}

shinyApp(ui = ui, server = server)
0 голосов
/ 08 февраля 2019

Пакет doParallel является расширением параллельного пакета, как показано в документации здесь.

https://cran.r -project.org / web / packages / doParallel / doParallel.pdf

Читая документацию параллельного пакета, мы видим, что он реализует 3 различных метода для достижения параллелизма.Помните, что R - это однопоточный язык.

  1. Новый сеанс R, в котором родительский процесс взаимодействует с рабочим или дочерним процессом.
  2. Через Forking
  3. ИспользованиеСредства уровня ОС

Вы можете найти эту информацию здесь,

https://stat.ethz.ch/R-manual/R-devel/library/parallel/doc/parallel.pdf

Следствием этого является то, что дочерний процесс не можетсвязаться с родительским процессом, пока он не завершит вычисления и не вернет значение.Насколько мне известно.

Следовательно, отметка индикатора выполнения в рабочем процессе невозможна.

Полное раскрытие, я не работал с пакетом doParallel и документациейв отношении блеска было ограничено.


Альтернативное решение

Существует аналогичный пакет, однако с обширной документацией в отношении блеска.Это пакеты futures и promises и ipc.futures и promises включают асинхронное программирование, а ipc включает межпроцессное взаимодействие.Чтобы помочь нам еще больше, он также имеет функцию AsyncProgress().

Вот пример, где мы отмечаем два счетчика синхронно.

Пример

library(shiny)
library(future)
library(promises)
library(ipc)

plan(multisession)


ui <- fluidPage(
  actionButton(inputId = "go", label = "Launch calculation")
)

server <- function(input, output, session) {

  observeEvent(input$go, {

    progress = AsyncProgress$new(message="Complex analysis")

    future({
      for (i in 1:15) {
        progress$inc(1/15)
        Sys.sleep(0.5)
      }

      progress$close()
      return(i)
    })%...>%
      cat(.,"\n")

    Sys.sleep(1)

    progress2 = AsyncProgress$new(message="Complex analysis")

    future({
      for (i in 1:5) {
        progress2$inc(1/5)
        Sys.sleep(0.5)
      }

      progress2$close()

      return(i)
    })%...>%
      cat(.,"\n")

    NULL
  })
}

shinyApp(ui = ui, server = server)

Ваш код адаптирован

Вот код, который вы написали, слегка модифицированный для выделения многих асинхронных процессов.Любая работа может быть выполнена на рабочем месте, например, созданный вами вектор и добавление rnorm тоже.(Здесь не показано)

library(shiny)
library(future)
library(promises)
library(ipc)

plan(multisession)

ui <- fluidPage(
  actionButton(inputId = "go", label = "Launch calculation")
)

server <- function(input, output, session) {

  observeEvent(input$go, {
    Runs=c(1:4) #define the number of runs
    progress = list() #A list to maintain progress for each run

    for(j in Runs){
      progress[[j]] = AsyncProgress$new(message="Complex analysis")
      future({
        for (i in 1:10) {
          progress[[j]]$inc(1/10)
          Sys.sleep(0.2)
        }
        progress[[j]]$close()
        return(i)
    })%...>%
        cat(.,'\n')
    }

    NULL
  })
}

shinyApp(ui = ui, server = server)

Приведенный выше код является модифицированной версией кода, найденного в документации по ipc здесь:

http://htmlpreview.github.io/?https://github.com/fellstat/ipc/blob/master/inst/doc/shinymp.html

Дополнительные ресурсы:

https://rstudio.github.io/promises/articles/overview.html

...