запись в глобальную среду при параллельной работе - PullRequest
6 голосов
/ 06 июня 2011

У меня есть data.frame ячеек, значений и координат. Он находится в глобальной среде.

> head(cont.values)
   cell value   x   y
1 11117    NA -34 322
2 11118    NA -30 322
3 11119    NA -26 322
4 11120    NA -22 322
5 11121    NA -18 322
6 11122    NA -14 322

Поскольку моей пользовательской функции требуется почти секунда для вычисления отдельной ячейки (и у меня есть десятки тысяч ячеек для вычисления), я не хочу дублировать вычисления для ячеек, которые уже имеют значение. Мое следующее решение пытается избежать этого. Каждая ячейка может быть рассчитана независимо, крича для параллельного выполнения.

Что моя функция на самом деле делает, так это проверяет, есть ли значение для указанного числа ячеек, и если оно NA, оно вычисляет его и вставляет вместо NA.

Я могу запустить свою магическую функцию (результат value для соответствующего cell), используя семейство функций apply, и из apply я могу читать и писать cont.values без проблем (это в глобальной среде) ).

Теперь я хочу запустить это параллельно (используя snowfall), и я не могу читать или записывать из / в эту переменную из отдельного ядра.

Вопрос: Какое решение сможет выполнять чтение / запись из / в динамическую переменную, находящуюся в глобальной среде, изнутри работника (ядра) при параллельном выполнении функции. Есть ли лучший способ сделать это?

Ответы [ 3 ]

4 голосов
/ 06 июня 2011

Шаблон центрального хранилища, с которым работники обращаются за значениями, реализован в пакете rredis в CRAN. Идея состоит в том, что сервер Redis поддерживает хранилище пар ключ-значение (ваш глобальный фрейм данных, повторно реализован). Рабочие опрашивают сервер, чтобы узнать, было ли рассчитано значение (redisGet), а если нет, то проведите расчет и сохраните его (redisSet), чтобы другие работники могли использовать его повторно. Рабочие могут быть R-сценариями, поэтому легко расширить рабочую силу. Это очень хорошая альтернативная параллельная парадигма. Вот пример, который использует понятие «запоминания» каждого результата. У нас есть функция, которая работает медленно (спит секунду)

fun <- function(x) { Sys.sleep(1); x }

Мы пишем «памятку», которая возвращает вариант fun, который сначала проверяет, было ли уже вычислено значение для x, и если да, то использует

memoize <-
    function(FUN)
{
    force(FUN) # circumvent lazy evaluation
    require(rredis)
    redisConnect()
    function(x)
    {
        key <- as.character(x)
        val <- redisGet(key)
        if (is.null(val)) {
            val <- FUN(x)
            redisSet(key, val)
        }
        val
    }
}

Затем мы запоминаем нашу функцию

funmem <- memoize(fun)

и иди

> system.time(res <- funmem(10)); res
   user  system elapsed 
  0.003   0.000   1.082 
[1] 10
> system.time(res <- funmem(10)); res
   user  system elapsed 
  0.001   0.001   0.040 
[1] 10

Для этого требуется сервер redis, работающий вне R, но очень простой в установке; см. документацию, прилагаемую к пакету rredis.

Параллельная версия в пределах R может быть

library(snow)
cl <- makeCluster(c("localhost","localhost"), type = "SOCK")
clusterEvalQ(cl, { require(rredis); redisConnect() })
tasks <- sample(1:5, 100, TRUE)
system.time(res <- parSapply(cl, tasks, funmem))
4 голосов
/ 06 июня 2011

Конечно, это будет зависеть от функции, о которой идет речь, но я боюсь, что snowfall не сильно поможет. Дело в том, что вам придется экспортировать весь фрейм данных в разные ядра (см. ?sfExport) и все же найти способ объединить его. Такой вид превосходит всю цель изменения значения в глобальной среде, поскольку вы, вероятно, хотите сохранить как можно меньше использования памяти.

Вы можете погрузиться в низкоуровневые функции snow, чтобы все это заработало. Смотрите следующий пример:

#Some data
Data <- data.frame(
  cell = 1:10,
  value = sample(c(100,NA),10,TRUE),
  x = 1:10,
  y = 1:10
)
# A sample function
sample.func <- function(){
    id <- which(is.na(Data$value)) # get the NA values

    # this splits up the values from the dataframe in a list
    # which will be passed to clusterApply later on.
    parts <- lapply(clusterSplit(cl,id),function(i)Data[i,c("x","y")])

    # Here happens the magic
    Data$value[id] <<-
    unlist(clusterApply(cl,parts,function(x){
        x$x+x$y
      }
    ))
}
#now we run it
require(snow)
cl <- makeCluster(c("localhost","localhost"), type = "SOCK")
sample.func()
stopCluster(cl)
> Data
   cell value  x  y
1     1   100  1  1
2     2   100  2  2
3     3     6  3  3
4     4     8  4  4
5     5    10  5  5
6     6    12  6  6
7     7   100  7  7
8     8   100  8  8
9     9    18  9  9
10   10    20 10 10

Вам все равно придется скопировать (частично) ваши данные, чтобы получить их до ядра. Но это все равно произойдет, когда вы вызовете snowfall высокоуровневые функции на фреймах данных, так как snowfall в любом случае использует низкоуровневую функцию snow.

Кроме того, не следует забывать, что если вы изменяете одно значение в кадре данных, весь кадр данных также копируется в память. Таким образом, вы не выиграете так много, добавив значения одно за другим, когда они возвращаются из кластера. Возможно, вы захотите попробовать несколько разных подходов, а также выполнить профилирование памяти.

1 голос
/ 06 июня 2011

Я согласен с Joris, что вам нужно будет скопировать ваши данные в другие ядра.С другой стороны, вам не нужно беспокоиться о том, что NA находится в данных или нет, в ядрах.Если ваш оригинальный data.frame называется cont.values:

nnaidx<-is.na(cont.values$value) #where is missing data originally
dfrnna<-cont.values[nnaidx,] #subset for copying to other cores
calcValForDfrRow<-function(dfrRow){return(dfrRow$x+dfrRow$y)}#or whatever pleases you
sfExport(dfrnna, calcValForDfrRow) #export what is needed to other cores
cont.values$value[nnaidx]<-sfSapply(seq(dim(dfrnna)[1]), function(i){calcValForDfrRow(dfrnna[i,])}) #sfSapply handles 'reordering', so works exactly as if you had called sapply

должен работать хорошо (за исключением опечаток)

...