Вы пробовали свой код на простом наборе данных? Потому что, как только я запустил его, он выполнял всю работу несколько раз (по одному разу для каждой строки в x
). Кроме того, если вы пытаетесь распараллелить работу, обычно хорошей идеей является дать «работнику» как можно больше работы, прежде чем отправлять данные обратно. В вашем коде у вас есть два последовательных вызова foreach
, которые приводят к дополнительным накладным расходам на связь.
Мой подход такой:
- Разделите объект
xts
на N
джонки, убедившись, что мы разделяемся с одним из 5-секундных интервалов.
- Пусть каждый работник выполняет всю работу за один кусок.
- Объедините результаты. Как выбрать
N
?
Так как split.xts
используется для первого шага, каждый блок будет иметь одинаковое количество интервалов 5 с. Однако объем выполняемой работы зависит (вероятно) в большей степени от количества точек данных, чем от количества интервалов 5 с. Поэтому, если распределение точек между этими чанками неравномерно, может иметь смысл использовать большее количество чанков вместе с некоторой балансировкой нагрузки. Если распределение точек равномерное, имеет смысл сделать N
максимально большим, чтобы минимизировать накладные расходы на связь. Здесь я выбрал последний подход, то есть установил N
равным количеству ядер.
Теперь давайте сгенерируем пример данных и применим ваше рабочее решение:
library(xts)
x <- xts(x = runif(100),
order.by = as.POSIXct("2018-01-01") + 0:99)
ends <- endpoints(x,"secs",5)
m <- period.apply(x, ends, mean)
index(m) <- trunc(index(m),"secs")
Далее мы настраиваем параллельный кластер:
library(doParallel)
library(foreach)
cores <- detectCores()
cluster <- makeCluster(cores, type = "PSOCK")
registerDoParallel(cluster)
Теперь мы должны разделить объект xts
. Здесь я сначала определяю временной интервал всего объекта и распределяю его по интервалам N
5 с.
N <- cores
k <- as.integer(ceiling(difftime(max(index(x)), min(index(x)), units = "secs") / (5 * N)))
Затем я разбил объект xts
на список xts
объектов, каждый из которых имеет примерно одинаковую длину:
split_x <- split(x, f = "secs", k = 5 * k)
Теперь я позволю foreach
перебрать эти куски и объединить результаты:
m2 <- foreach(x = split_x, .packages = c("xts"), .combine = c) %dopar% {
ends <- endpoints(x,"secs",5)
m <- period.apply(x, ends, mean)
index(m) <- trunc(index(m),"secs")
m
}
stopCluster(cluster)
Ура, результаты равны:
all.equal(m, m2)
#> [1] TRUE