R вложенный цикл foreach - PullRequest
       6

R вложенный цикл foreach

0 голосов
/ 20 февраля 2019

У меня есть входной набор данных:

# environment
require(pacman)

p_load(
  data.table
  , doParallel
  , foreach
)


doParallel::registerDoParallel(makeCluster(4))

# create input
runDT <- data.table(run = c(F,T,F,T)
                    , input1 = 1:4
                    , run_id = 1:4)
print(runDT)
     run input1 run_id
1: FALSE      1      1
2:  TRUE      2      2
3: FALSE      3      3
4:  TRUE      4      4

, и это еще один необработанный набор данных:

dataDT <- data.table(
  ID = 1:4
  , c1 = c(1:4))
print(dataDT)
   ID c1
1:  1  1
2:  2  2
3:  3  3
4:  4  4

Я хотел бы запустить вложенные циклы foreach, но он выдает мне ошибку:

# run
row_run <- runDT[run == T, run_id]

resultsDT <- foreach::foreach(
  k = 1:length(row_run), .inorder = FALSE, .packages = c("data.table")) %dopar% {

    # get the input for this run
    inputDT <- runDT[run_id == row_run[k],]

    # apply the input for all dataDT rows
    result_run <- foreach::foreach(
      j = 1:nrow(dataDT), .inorder = FALSE, .packages = c("data.table")) %dopar% {

        dataDT_run <- dataDT[ID == j,]
        dataDT_run[, c("o1", "run_id") := list(
          c1 + inputDT[, input1]
          , inputDT[, run_id]
        )]
        return(dataDT_run[, c("o1", "run_id"), with = FALSE])
      }
    result_run <- rbindlist(result_run)
    return(result_run)
  }
Error in { : task 1 failed - "could not find function "%dopar%""
resultsDT <- rbindlist(resultsDT)
print(resultsDT)

Результат, который я ожидаю увидеть:

resultsDT <- data.table(
  o1 = c((1:4) + 2,c(1:4) + 4)
  , run_id = c(rep(2,4),rep(4,4))
)
print(resultsDT)
   o1 run_id
1:  3      2
2:  4      2
3:  5      2
4:  6      2
5:  5      4
6:  6      4
7:  7      4
8:  8      4

Затем я изменил первый %dopar% на %:%, но он дает другую ошибку:

Error in foreach::foreach(k = 1:length(row_run), .inorder = FALSE, .packages = c("data.table")) %:%  : 
  no function to return from, jumping to top level

Как это исправить?

Ответы [ 2 ]

0 голосов
/ 09 мая 2019

Но если мы сделаем это таким образом, будет ли runDT копироваться в RAM k * j раз?Поскольку мой фактический runDT довольно большой.

Я отвечу на ваш дополнительный вопрос

doParallel::registerDoParallel(makeCluster(4))

Когда вы создадите 4 кластера, runDT будет скопирован в ваш 4 кластера.

 inputDT <- runDT[run_id == row_run[k],]

Кроме того, примите k*j как 8 и все inputDT размеры 100MB.

size(Cluster1) : runDT + inputDT(100MB) + inputDT(100MB) + etc
size(Cluster2) : runDT + inputDT(100MB) + inputDT(100MB) + etc
size(Cluster3) : runDT + inputDT(100MB) + inputDT(100MB) + etc
size(Cluster4) : runDT + inputDT(100MB) + inputDT(100MB) + etc
0 голосов
/ 20 февраля 2019

Исправлено .. Кажется, мне придется поместить inputDT <- runDT[run_id == row_run[k],] внутри цикла:

resultsDT <- foreach::foreach(
  k = 1:length(row_run), .inorder = FALSE, .packages = c("data.table"), .combine = 'rbind') %:%
    # apply the input for all dataDT rows
    foreach::foreach(
      j = 1:nrow(dataDT), .combine = 'rbind') %dopar% {

        # get the input for this run
        inputDT <- runDT[run_id == row_run[k],]

        dataDT_run <- dataDT[ID == j,]
        dataDT_run[, c("o1", "run_id") := list(
          c1 + inputDT[, input1]
          , inputDT[, run_id]
        )]
        return(dataDT_run[, c("o1", "run_id"), with = FALSE])
}
print(resultsDT)
   o1 run_id
1:  3      2
2:  4      2
3:  5      2
4:  6      2
5:  5      4
6:  6      4
7:  7      4
8:  8      4

Но если мы сделаем это таким образом, runDT копируется в RAM k * j раз?Потому что мой фактический runDT довольно большой.

...