Я прошу помощи в разработке специфицированного класса c R6 и особенно в разработке метода run
для параллельного запуска процессов. Обратите внимание, что весь приведенный ниже пример кода не был протестирован и, скорее всего, содержит ошибки. Они просто здесь, чтобы помочь донести, как я думаю о реализации распараллеливания заданий в моем R6Class.
Я создал объект типа R6class под названием Input
, который является оболочкой для платформы моделирования. , Цель этого занятия - облегчить написание индивидуального набора параметров для входов в платформу моделирования. Это может выглядеть как
input = Input$new()
input$set_parameter_x(...)
input$set_parameter_y(...)
Мне бы хотелось, чтобы класс мог напрямую запускать симуляции (с помощью метода run
) и делать это с определенным числом потоков, но я не уверен, как это сделать. лучше всего достичь этой цели. Обратите внимание, что каждый процесс, запущенный run
, является однопоточным. Однако мне бы хотелось, чтобы каждый процесс, запущенный run
, мог выполняться параллельно с вызовом метода run
, сделанного из другого экземпляра класса Input
. Нечто подобное
input_1$run(executable = "/path/to/executable", maxNbThreads = 4)
input_2$run(executable = "/path/to/executable", maxNbThreads = 4)
input_3$run(executable = "/path/to/executable", maxNbThreads = 4)
input_4$run(executable = "/path/to/executable", maxNbThreads = 4)
будет работать параллельно. Я не знаю много о парализации в R (отсюда и мой вопрос), но, конечно, вместо этого можно было бы сделать
foreach (input_index = 1:nbInputs) %dopar%
{
require(myPackage)
input = Input$new()
input$set_parameter_x(...)
input$set_parameter_y(...)
input$run(executable = "/path/to/executable", maxNbThreads = 1)
}
, но я бы сказал sh, что работа по распараллеливанию процессов будет учтена учетная запись R6class Input
, а не пользователем этого класса.
Я думаю о том, чтобы вектор с именем runningThreads
был общим для всех экземпляров класса (атрибут stati c класса ) с использованием среды (как объяснено здесь ). runningThreads
будет содержать pid текущих запущенных заданий. Затем каждый раз, когда вызывается метод run
, пользователь указывает максимальное количество потоков (maxNbThreads
) и через некоторое время l oop удаляет из runningThreads
список рабочих мест, которые больше не активны. пока длина runningThreads
не станет меньше аргумента maxNbThreads
, предоставленного run
. run
запускает задание и добавляет его pid к runningThreads
. Метод publi c run
(и частные методы, которые вызовет run
) может выглядеть примерно так:
isAThreadAvailable = function(maxNbThreads)
{
for (thread_index in 1:length(private$runningThreads))
{
thread = private$runningThreads[thread_index]
if (!isThreadRunning(thread))
{
private$runningThreads = private$runningThreads[-thread_index]
}
}
return (length(private$runningThreads) < maxNbThreads)
}
isThreadRunning = function(thread)
{
return (system(paste("kill -0", pid), intern=TRUE))
}
run = function(exec = defaultExecutable, maxNbThreads = 1, sleepTimeInSec = 2)
{
stopifnot(maxNbThreads > 0)
if (maxNbThreads == 1)
{
# Then just run it and wait for it to end
system(paste(exec, paste(private$data, collapse=" ")))
} else
{
while (!isAThreadAvailable(maxNbThreads))
{
Sys.sleep(sleepTimeInSec)
}
newThread = system(paste(exec, paste(private$data, collapse=" "), "&; echo !#"), intern=TRUE)
private$runningThreads = c(private$runningThreads, newThread)
}
}
Похоже ли это на хороший метод? Вероятно, есть пакеты, которые могут облегчить сборку моего R6class. Не могли бы вы указать мне на эти пакеты и, возможно, показать небольшой пример того, как его можно использовать для моего run
метода в моем R6Class?