Какой самый простой способ распараллелить векторизованную функцию в R? - PullRequest
15 голосов
/ 06 апреля 2011

У меня очень большой список X и векторизованная функция f. Я хочу рассчитать f(X), но это займет много времени, если я сделаю это с одним ядром. У меня есть (доступ к) 48-ядерный сервер. Какой самый простой способ распараллелить вычисление f(X)? Ниже не правильный ответ:

library(foreach)
library(doMC)
registerDoMC()

foreach(x=X, .combine=c) %dopar% f(x)

Приведенный выше код действительно распараллелит вычисление f(X), но это будет сделано путем применения f отдельно к каждому элементу X. Это игнорирует векторизованную природу f и, вероятно, сделает медленнее в результате, а не быстрее. Вместо того, чтобы применять f поэлементно к X, я хочу разделить X на куски разумного размера и применить к ним f.

Итак, я должен просто вручную разделить X на 48 одинаковых по размеру подсписков и , затем , применить f к каждому параллельно, а затем вручную собрать результат? Или для этого предназначен пакет?

Если кому-то интересно, мой конкретный пример использования здесь .

Ответы [ 5 ]

5 голосов
/ 11 августа 2011

Хотя это старый вопрос, он может быть интересен всем, кто сталкивался с этим через Google (например, я): посмотрите на функцию pvec в пакете multicore. Я думаю, что это именно то, что вы хотите.

4 голосов
/ 13 апреля 2011

Вот моя реализация.Это функция chunkmap, которая принимает векторизованную функцию, список аргументов, которые должны быть векторизованы, и список аргументов, которые не должны быть векторизованы (т.е. константы), и возвращает тот же результат, что и вызов функции для аргументов напрямую,за исключением того, что результат рассчитывается параллельно.Для функции f, векторных аргументов v1, v2, v3 и скалярных аргументов s1, s2 следующие должны возвращать идентичные результаты:

f(a=v1, b=v2, c=v3, d=s1, e=s2)
f(c=v3, b=v2, e=s2, a=v1, d=s1)
chunkapply(FUN=f, VECTOR.ARGS=list(a=v1, b=v2, c=v3), SCALAR.ARGS=list(d=s1, e=s2))
chunkapply(FUN=f, SCALAR.ARGS=list(e=s2, d=s1), VECTOR.ARGS=list(a=v1, c=v3, b=v2))

Так какфункция chunkapply не может знать, какие аргументы f векторизованы, а какие нет, вы сами можете указать, когда вы ее вызываете, иначе вы получите неправильные результаты.Как правило, вы должны указывать свои аргументы, чтобы убедиться, что они связаны правильно.

library(foreach)
library(iterators)
# Use your favorite doPar backend here
library(doMC)
registerDoMC()

get.chunk.size <- function(vec.length,
                           min.chunk.size=NULL, max.chunk.size=NULL,
                           max.chunks=NULL) {
  if (is.null(max.chunks)) {
    max.chunks <- getDoParWorkers()
  }
  size <- vec.length / max.chunks
  if (!is.null(max.chunk.size)) {
    size <- min(size, max.chunk.size)
  }
  if (!is.null(min.chunk.size)) {
    size <- max(size, min.chunk.size)
  }
  num.chunks <- ceiling(vec.length / size)
  actual.size <- ceiling(vec.length / num.chunks)
  return(actual.size)
}

ichunk.vectors <- function(vectors=NULL,
                           min.chunk.size=NULL,
                           max.chunk.size=NULL,
                           max.chunks=NULL) {
  ## Calculate number of chunks
  recycle.length <- max(sapply(vectors, length))
  actual.chunk.size <- get.chunk.size(recycle.length, min.chunk.size, max.chunk.size, max.chunks)
  num.chunks <- ceiling(recycle.length / actual.chunk.size)

  ## Make the chunk iterator
  i <- 1
  it <- idiv(recycle.length, chunks=num.chunks)
  nextEl <- function() {
    n <- nextElem(it)
    ix <- seq(i, length = n)
    i <<- i + n
    vchunks <- foreach(v=vectors) %do% v[1+ (ix-1) %% length(v)]
    names(vchunks) <- names(vectors)
    vchunks
  }
  obj <- list(nextElem = nextEl)
  class(obj) <- c("ichunk", "abstractiter", "iter")
  obj
}

chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), MERGE=TRUE, ...) {
  ## Check that the arguments make sense
  stopifnot(is.list(VECTOR.ARGS))
  stopifnot(length(VECTOR.ARGS) >= 1)
  stopifnot(is.list(SCALAR.ARGS))
  ## Choose appropriate combine function
  if (MERGE) {
    combine.fun <- append
  } else {
    combine.fun <- foreach:::defcombine
  }
  ## Chunk and apply, and maybe merge
  foreach(vchunk=ichunk.vectors(vectors=VECTOR.ARGS, ...),
          .combine=combine.fun,
          .options.multicore = mcoptions) %dopar%
  {
    do.call(FUN, args=append(vchunk, SCALAR.ARGS))
  }
}

## Only do chunkapply if it will run in parallel
maybe.chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), ...) {
  if (getDoParWorkers() > 1) {
    chunkapply(FUN, VECTOR.ARGS, SCALAR.ARGS, ...)
  } else {
    do.call(FUN, append(VECTOR.ARGS, SCALAR.ARGS))
  }
}

Вот несколько примеров, показывающих, что chunkapply(f,list(x)) дает результаты, идентичные f(x).Я установил max.chunk.size очень маленьким, чтобы гарантировать, что алгоритм чанкинга действительно используется.

> # Generate all even integers from 2 to 100 inclusive
> identical(chunkapply(function(x,y) x*y, list(1:50), list(2), max.chunk.size=10), 1:50 * 2)
[1] TRUE

> ## Sample from a standard normal distribution, then discard values greater than 1
> a <- rnorm(n=100)
> cutoff <- 1
> identical(chunkapply(function(x,limit) x[x<=limit], list(x=a), list(limit=cutoff), max.chunk.size=10), a[a<cutoff])
[1] TRUE

Если у кого-то есть имя лучше, чем "chunkapply", пожалуйста, предложите его.

Edit:

Как указывает другой ответ, в многоядерном пакакге есть функция pvec, которая очень похожа на ту, что я написал.Для простых случаев, вы должны нам это, и вы должны проголосовать за ответ Джонаса Рауха.Тем не менее, моя функция немного более общая, поэтому, если к вам применимо любое из следующего, вы можете вместо этого использовать мою функцию:

  • Вам нужно использовать параллельный бэкэнд, отличный от многоядерного (например, MPI).Моя функция использует foreach, поэтому вы можете использовать любую инфраструктуру распараллеливания, которая предоставляет бэкэнд для foreach.
  • Вам необходимо передать несколько векторизованных аргументов.pvec векторизует только один аргумент, поэтому вы не можете легко реализовать параллельное векторизованное сложение, например, с pvec.Моя функция позволяет указывать произвольные аргументы.
2 голосов
/ 14 марта 2013

Пакет itertools был разработан для решения подобных проблем. В этом случае я бы использовал isplitVector:

n <- getDoParWorkers()
foreach(x=isplitVector(X, chunks=n), .combine='c') %dopar% f(x)

В этом примере pvec, несомненно, быстрее и проще, но его можно использовать в Windows с пакетом doParallel, например.

0 голосов
/ 07 апреля 2011

Как насчет этого? R использует всю доступную память, а multicore распараллеливает все доступные ядра.

library(multicore)
result = mclapply(X, function,mc.preschedule=FALSE, mc.set.seed=FALSE)
0 голосов
/ 06 апреля 2011

Map-Reduce может быть тем, что вы ищете;это было портировано на R

...