Распараллеливание с data.table - PullRequest
0 голосов
/ 26 апреля 2018

У меня следующая проблема. У меня есть кусочно-линейная функция, описываемая (xPoints, yPoints), и я хочу быстро вычислять - я должен делать это снова и снова - подразумеваемое значение y для длинного списка x, где x может выйти за пределы диапазон xPoints. Я кодировал функцию f_pwl, которая вычисляет подразумеваемое значение y, но она медленная, поэтому я пытался распараллелить ее вызов. Но на самом деле это медленнее, чем использование data.table: = синтаксис. Я буду благодарен за предложения ускорить процесс либо улучшив мою функцию f_pwl, либо внедрив эффективное распараллеливание, так как у меня есть доступ к 20 ядрам для ускорения.

Вот пример кода.

    # libraries
    require(data.table) # for fread, work with large data
    require(abind)      # for abind()
    require(foreach) # for parallel processing, used with doParallel
    require(doParallel) # for parallel processing, used with foreach

    f_pwl <- function(x)  {
      temp <- as.vector( rep(NA, length = length(x)), mode = "double" )
      for (i in seq(from = 1, to = length(x), by = 1))  {
        if (x[i] > max(xPoints) | x[i] < min(xPoints)) {
          # nothing to do, temp[i] <- NA
        } else if (x[i] == max(xPoints)) {
          # value equal max(yPoints)
          temp[i] <- max(yPoints)
        } else {
          # value is f_pwl(x)
          xIndexVector = as.logical( x[i] >= xPoints  &  abind(xPoints[2:length(xPoints)], max(xPoints)) > x[i] )
          xIndexVector_plus1 = shift( xIndexVector, n = 1, fill = FALSE, type = "lag" )
          alpha_j = (xPoints[xIndexVector_plus1] - x[i])/(xPoints[xIndexVector_plus1] - xPoints[xIndexVector])
          temp[i] <- alpha_j %*% yPoints[xIndexVector] + (1-alpha_j) %*% yPoints[xIndexVector_plus1]
        }
      } # end for i
      as.vector( temp, mode = "double" )
    }


    ## Main program
    xPoints <- c(4, 9, 12, 15, 18, 21)
    yPoints <- c(1, 2, 3, 4, 5, 6)

    x <- rnorm(1e4, mean = 12, sd = 5)

    dt <- as.data.table( x )
    dt[ , c("y1", "y2", "y3") := as.vector( mode = "double", NA ) ]

    # data.table := command
    system.time({
      dt[, y2 := f_pwl( x ) ]
    })

    # mapply
    system.time({
      dt[ , y1 := mapply( f_pwl, x ), by=.I  ]
    })

    # parallel
    system.time({
      #setup parallel backend to use many processors
      cores=detectCores()
      cl <- makeCluster(cores[1]-1, type="FORK") #not to overload your computer
      registerDoParallel(cl)
      dt$y3 <- foreach(i=1:nrow(dt), .combine=cbind) %dopar% {
        tempY <- f_pwl( dt$x[i] )
        tempY
      }
      #stop cluster
      stopCluster(cl)
    })

    summary( dt[ , .(y1-y2, y1-y3, y2-y3)] )   

1 Ответ

0 голосов
/ 27 апреля 2018

Сначала вычислите и сохраните значения alpha_j.

Затем сначала выполните сортировку DT по x и нарежьте ее на соответствующие интервалы перед выполнением линейной интерполяции

alpha <- c(NA, diff(yPoints) / diff(xPoints))

DT[order(x), 
    y := alpha[.GRP] * (x - xPoints[.GRP-1L]) + yPoints[.GRP-1L], 
    by=cut(x, xPoints)]

Пожалуйста, дайте мне знать, как это работает.

данные:

library(data.table)

## Main program
set.seed(27L)
xPoints <- c(4, 9, 12, 15, 18, 21)
yPoints <- c(1, 2, 3, 4, 5, 6)
DT <- data.table(x=rnorm(1e4, mean=12, sd=5))

проверка:

f_pwl <- function(x)  {
    temp <- as.vector( rep(NA, length = length(x)), mode = "double" )
    for (i in seq(from = 1, to = length(x), by = 1))  {
        if (x[i] > max(xPoints) | x[i] < min(xPoints)) {
            # nothing to do, temp[i] <- NA
        } else if (x[i] == max(xPoints)) {
            # value equal max(yPoints)
            temp[i] <- max(yPoints)
        } else {
            # value is f_pwl(x)
            xIndexVector = as.logical( x[i] >= xPoints  &  abind(xPoints[2:length(xPoints)], max(xPoints)) > x[i] )
            xIndexVector_plus1 = shift( xIndexVector, n = 1, fill = FALSE, type = "lag" )
            alpha_j = (xPoints[xIndexVector_plus1] - x[i])/(xPoints[xIndexVector_plus1] - xPoints[xIndexVector])
            temp[i] <- alpha_j %*% yPoints[xIndexVector] + (1-alpha_j) %*% yPoints[xIndexVector_plus1]
        }
    } # end for i
    as.vector( temp, mode = "double" )
}
system.time({
    DT[, yOP := f_pwl( x ) ]
})

DT[abs(y-yOP) > 1e-6]
#Empty data.table (0 rows) of 3 cols: x,y,yOP
...