Парализация различных экземпляров в классе R6 - PullRequest
1 голос
/ 25 апреля 2020

Я прошу помощи в разработке специфицированного класса c R6 и особенно в разработке метода run для параллельного запуска процессов. Обратите внимание, что весь приведенный ниже пример кода не был протестирован и, скорее всего, содержит ошибки. Они просто здесь, чтобы помочь донести, как я думаю о реализации распараллеливания заданий в моем R6Class.


Я создал объект типа R6class под названием Input, который является оболочкой для платформы моделирования. , Цель этого занятия - облегчить написание индивидуального набора параметров для входов в платформу моделирования. Это может выглядеть как

input = Input$new()
input$set_parameter_x(...)
input$set_parameter_y(...)

Мне бы хотелось, чтобы класс мог напрямую запускать симуляции (с помощью метода run) и делать это с определенным числом потоков, но я не уверен, как это сделать. лучше всего достичь этой цели. Обратите внимание, что каждый процесс, запущенный run, является однопоточным. Однако мне бы хотелось, чтобы каждый процесс, запущенный run, мог выполняться параллельно с вызовом метода run, сделанного из другого экземпляра класса Input. Нечто подобное

input_1$run(executable = "/path/to/executable", maxNbThreads = 4)
input_2$run(executable = "/path/to/executable", maxNbThreads = 4)
input_3$run(executable = "/path/to/executable", maxNbThreads = 4)
input_4$run(executable = "/path/to/executable", maxNbThreads = 4)

будет работать параллельно. Я не знаю много о парализации в R (отсюда и мой вопрос), но, конечно, вместо этого можно было бы сделать

foreach (input_index = 1:nbInputs) %dopar%
{
    require(myPackage)
    input = Input$new()
    input$set_parameter_x(...)
    input$set_parameter_y(...)
    input$run(executable = "/path/to/executable", maxNbThreads = 1)
}

, но я бы сказал sh, что работа по распараллеливанию процессов будет учтена учетная запись R6class Input, а не пользователем этого класса.

Я думаю о том, чтобы вектор с именем runningThreads был общим для всех экземпляров класса (атрибут stati c класса ) с использованием среды (как объяснено здесь ). runningThreads будет содержать pid текущих запущенных заданий. Затем каждый раз, когда вызывается метод run, пользователь указывает максимальное количество потоков (maxNbThreads) и через некоторое время l oop удаляет из runningThreads список рабочих мест, которые больше не активны. пока длина runningThreads не станет меньше аргумента maxNbThreads, предоставленного run. run запускает задание и добавляет его pid к runningThreads. Метод publi c run (и частные методы, которые вызовет run) может выглядеть примерно так:

isAThreadAvailable = function(maxNbThreads)
{
    for (thread_index in 1:length(private$runningThreads))
    {
        thread = private$runningThreads[thread_index]
        if (!isThreadRunning(thread))
        {
            private$runningThreads = private$runningThreads[-thread_index]
        }
    }

    return (length(private$runningThreads) <  maxNbThreads)
}


isThreadRunning = function(thread)
{
    return (system(paste("kill -0", pid), intern=TRUE))
}

run = function(exec = defaultExecutable, maxNbThreads = 1, sleepTimeInSec = 2)
{
    stopifnot(maxNbThreads > 0)

    if (maxNbThreads == 1)
    {
        # Then just run it and wait for it to end
        system(paste(exec, paste(private$data, collapse=" ")))
    } else
    {
        while (!isAThreadAvailable(maxNbThreads))
        {
            Sys.sleep(sleepTimeInSec)
        }

        newThread = system(paste(exec, paste(private$data, collapse=" "), "&; echo !#"), intern=TRUE)

        private$runningThreads = c(private$runningThreads, newThread)
    }
}

Похоже ли это на хороший метод? Вероятно, есть пакеты, которые могут облегчить сборку моего R6class. Не могли бы вы указать мне на эти пакеты и, возможно, показать небольшой пример того, как его можно использовать для моего run метода в моем R6Class?

1 Ответ

0 голосов
/ 25 апреля 2020

Благодаря пакету processx, который @HongOoi упомянул в комментарии, я смог реализовать то, что мне было нужно. Я сделал в очень похожем стиле, как и раньше, но пакет processx сделал все намного проще.

Вот код для run метода

 isAThreadAvailable = function(maxNbThreads)
    {
        while (private$shared$isOtherProcessCheckingThreads)
        {
            Sys.sleep(0.001)
        }
        private$shared$isOtherProcessCheckingThreads = TRUE

        thread_index = 1
        while (thread_index <= length(private$shared$runningThreads))
        {
            thread = private$shared$runningThreads[[thread_index]]
            if (!thread$is_alive())
            {
                private$shared$runningThreads = private$shared$runningThreads[-thread_index]
            } else
            {
                thread_index = thread_index + 1
            }
        }

        r = length(private$shared$runningThreads) < maxNbThreads
        private$shared$isOtherProcessCheckingThreads = FALSE
        return(r)
    },


    run = function(exec = "SimBit", maxNbThreads = 1, sleepTimeInSec = 1, waitEndOfThread = FALSE)
    {
        stopifnot(maxNbThreads > 0)
        stopifnot(sleepTimeInSec >= 0)

        while (!self$isAThreadAvailable(maxNbThreads))
        {
            Sys.sleep(sleepTimeInSec)
        }

        newThread = processx::process$new(exec, paste(private$data, collapse=" "))


        private$shared$runningThreads = c(private$shared$runningThreads, newThread)

        if (waitEndOfThread)
        {
            while (newThread$is_alive())
            {
                Sys.sleep(sleepTimeInSec)
            }
        }
    }
...