R-Shiny App неправильно обновляет индикатор выполнения при использовании фьючерсов - PullRequest
1 голос
/ 05 июня 2019

У меня блестящее приложение, которое должно выполнять вычисления на нескольких ядрах, обеспечивая обратную связь через индикаторы выполнения. Это работает нормально, пока я не обработаю результаты фьючерсов далее (см. Рабочий пример ниже). Как только я использую результаты впоследствии, индикаторы выполнения не обновляются, пока все фьючерсы не будут сделаны.

Я использую пакеты future, promises и ipc для межпроцессного взаимодействия. Я думаю, что проблема в том, что R хочет продолжить работу с фьючерсами, как только будут получены результаты. Я пытался остановить алгоритм с помощью команд, таких как resolved () или resol (), но без какого-либо прогресса.

library(shiny)
library(future)
library(promises)
library(ipc)

plan(list(multiprocess, sequential))

ui <- fluidPage(
    actionButton(inputId = "go", label = "Launch calculation")
)

server <- function(input, output, session) {

    observeEvent(input$go, {
        x <- list()
        N = availableCores()
        Tasks = rep(10, N) #Number of sequential tasks per core

        progress = list() #A list to maintain progress for each run

        resultsvec <- c()
        for(j in 1:N){

            progress[[j]] = AsyncProgress$new(message = paste("analysis, core ",j))

            x[[j]] <- future({
                for(l in 1:Tasks[j]){
                    progress[[j]]$inc(1/Tasks[j])
                    resultsvec <- append(resultsvec, l)
                    Sys.sleep(1)
                }
                resultsvec
                progress[[j]]$close()
            })
        }
        result <- lapply(x, value)
        #... do stuff with result
    })
}

shinyApp(ui = ui, server = server)

Вот функция сервера, при которой индикаторы выполнения обновляются корректно.

server <- function(input, output, session) {

    observeEvent(input$go, {
        x <- list()
        N = availableCores()
        Tasks = rep(10, N) #Number of sequential tasks per core
        progress = list() #A list to maintain progress for each run

        for(j in 1:N){

            progress[[j]] = AsyncProgress$new(message = paste("analysis, core ",j))

            x[[j]] <- future({
                for(l in 1:Tasks[j]){
                    progress[[j]]$inc(1/Tasks[j])
                    Sys.sleep(1)
                }
                progress[[j]]$close()
            })
        }
    })
}

1 Ответ

0 голосов
/ 07 июня 2019

Мне удалось решить проблему для моих нужд, хотя решение больше не использует фьючерсы. Я переключился на пакет doSNOW. Но, насколько мне известно, в doSNOW или других параллельных пакетах, кроме future/promises, нет опции, которая разрешает межпроцессное взаимодействие. Так что это мой обходной путь. Я использовал один индикатор выполнения для всего процесса, в отличие от выше.

library(shiny)
library(doSNOW)

ui <- fluidPage(
    actionButton(inputId = "go", label = "Launch calculation")
)

server <- function(input, output, session) {

    observeEvent(input$go, {

        Tasks <- 40 #now total tasks to do
        runs <- 10 #splitting of progress bar. 10 means every 10% it gets updated. 20 every 5% etc.

        taskvec <- rep(Tasks %/% runs, runs)

        if (Tasks %% runs != 0){
            taskvec[1:(Tasks %% runs)] <- taskvec[1:(Tasks %% runs)] + 1
        }

        resultsvec <- c()

        cl <- makeCluster(2)
        registerDoSNOW(cl)

        withProgress(message = "Analysis", value = 0,{
            for (j in 1:runs) {

                resultsvec_sub <- foreach(i = 1:taskvec[j], 
                                          .combine = append) %dopar% {
                                              f <- i
                                              Sys.sleep(1)
                                              return(f)
                                          }
                resultsvec <- append(resultsvec, resultsvec_sub)
                incProgress(1/runs)
            }
        })
        stopCluster(cl)
        #do stuff with resultsvec..
    })
}

shinyApp(ui = ui, server = server)

Как вы можете видеть, я разделяю задачи перед тем, как назначить их ядрам, и обновляю индикатор выполнения, когда каждое разделение выполняется на всех ядрах. Это, конечно, более неэффективно, потому что, когда почти все задачи в пределах раскола выполнены. Некоторые ядра могут простаивать до тех пор, пока не будут созданы другие ядра и не начнется следующее разделение. Можно было бы улучшить процесс разделения / распределения задач, но он работает сейчас.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...