Попытка этого кода
library(future)
library(foreach)
ncores <- 3
cl <- parallel::makeCluster(3)
avail <- bigstatsr::FBM(ncores, 1, type = "integer", init = 1)
doFuture::registerDoFuture()
res <- vector("list", 5)
for (i in seq_along(res)) {
while (sum(avail[]) == 0) {
cat("Waiting..\n")
Sys.sleep(0.5)
}
ind.avail <- which(avail[] == 1)
cat("Available:", length(ind.avail), "\n")
plan(cluster, workers = cl[ind.avail])
foo <- foreach(i = 3:1) %dopar% {
Sys.sleep(i)
}
print(one <- ind.avail[1])
avail[one] <- 0; print(avail[])
res[[i]] <- cluster(workers = cl[one], {
Sys.sleep(5)
avail[one] <- 1
i
})
}
sapply(res, resolved)
parallel::stopCluster(cl)
Ошибка, которую я получаю: Initialization of plan() failed, because the test future used for validation failed. The reason was: Unexpected result (of class ‘NULL’ != ‘FutureResult’) retrieved for ClusterFuture future (label = ‘<none>’, expression = ‘NA’)
.
Объяснение моего примера, пытающегося воспроизвести мою реальную проблему:
- Iмногократно (здесь 5) по двум шагам
- первый шаг легко распараллеливается с foreach
- второй шаг распараллелить нелегко и зависит от первого шага
Таким образом, моя идея состояла в том, чтобы распараллелить первый шаг по всем доступным кластерам и выполнить второй шаг асинхронно, используя только один кластер.Этот кластер больше не будет доступен, пока эта асинхронная работа не будет завершена.Тогда на следующем первом шаге будет доступно меньше кластера и так далее.Если для первого шага больше нет доступного кластера, он будет ожидать завершения некоторой асинхронной работы и освобождения некоторого кластера.