Используя кластер ветвления, возвращаемый parallel::makeForkCluster()
, я пытаюсь передать данные между процессами. Пока что у меня есть способность общаться между родительским и любым дочерним процессами.
send_to_node <- local({
assign_gobal <- function(n, v) {
assign(n, v, envir = .GlobalEnv)
NULL
}
function(dat, node, name) node_call(node, assign_gobal, name, dat)
})
retrieve_from_node <- function(node, name) {
node_call(node, get, name, envir = .GlobalEnv)
}
node_call <- function(node, fun, ...) {
serialize(
list(
type = "EXEC",
data = list(fun = fun, args = list(...), return = TRUE, tag = NULL)
), node$con, xdr = FALSE
)
unserialize(node$con)
}
cl <- parallel::makeForkCluster(nnodes = 2L)
x <- "hello world"
send_to_node(x, cl[[1]], "x")
retrieve_from_node(cl[[1]], "x")
parallel::stopCluster(cl)
По аналогии с send_to_node()
, описанным выше, я пытаюсь реализовать функцию node_to_node()
, которая может отправлять объект из одного дочернего процесса в другой, не проходя через главный процесс. Однако то, что я имею до сих пор, не работает.
node_to_node <- local({
send <- function(local_name, node, remote_name) {
send_to_node(get(local_name, envir = .GlobalEnv), node, remote_name)
}
function(source_node, source_name, dest_node, dest_name = source_name) {
node_call(source_node, send, source_name, dest_node, dest_name)
}
})
cl <- parallel::makeForkCluster(nnodes = 2L)
x <- "hello world"
send_to_node(x, cl[[1]], "x")
node_to_node(cl[[1]], "x", cl[[2]])
retrieve_from_node(cl[[1]], "x")
retrieve_from_node(cl[[2]], "x")
parallel::stopCluster(cl)
Любая помощь очень ценится.