Обмен данными между процессами в кластере вил (из параллельного пакета R) - PullRequest
0 голосов
/ 16 января 2019

Используя кластер ветвления, возвращаемый parallel::makeForkCluster(), я пытаюсь передать данные между процессами. Пока что у меня есть способность общаться между родительским и любым дочерним процессами.

send_to_node <- local({
  assign_gobal <- function(n, v) {
    assign(n, v, envir = .GlobalEnv)
    NULL
  }
  function(dat, node, name) node_call(node, assign_gobal, name, dat)
})

retrieve_from_node <- function(node, name) {
  node_call(node, get, name, envir = .GlobalEnv)
}

node_call <- function(node, fun, ...) {
  serialize(
    list(
      type = "EXEC",
      data = list(fun = fun, args = list(...), return = TRUE, tag = NULL)
    ), node$con, xdr = FALSE
  )
  unserialize(node$con)
}

cl <- parallel::makeForkCluster(nnodes = 2L)

x <- "hello world"
send_to_node(x, cl[[1]], "x")
retrieve_from_node(cl[[1]], "x")

parallel::stopCluster(cl)

По аналогии с send_to_node(), описанным выше, я пытаюсь реализовать функцию node_to_node(), которая может отправлять объект из одного дочернего процесса в другой, не проходя через главный процесс. Однако то, что я имею до сих пор, не работает.

node_to_node <- local({
  send <- function(local_name, node, remote_name) {
    send_to_node(get(local_name, envir = .GlobalEnv), node, remote_name)
  }
  function(source_node, source_name, dest_node, dest_name = source_name) {
    node_call(source_node, send, source_name, dest_node, dest_name)
  }
})

cl <- parallel::makeForkCluster(nnodes = 2L)

x <- "hello world"
send_to_node(x, cl[[1]], "x")

node_to_node(cl[[1]], "x", cl[[2]])
retrieve_from_node(cl[[1]], "x")
retrieve_from_node(cl[[2]], "x")

parallel::stopCluster(cl)

Любая помощь очень ценится.

...