Есть ли способ использовать period.apply с doParallel и foreach в xts? - PullRequest
0 голосов
/ 27 апреля 2018

Я бы хотел распараллелить функцию period.apply в R, я пытаюсь использовать doParallel с Foreach, но я не знаю, как я мог бы реализовать эту функцию. Данные, которые я использую, это xts объект с индексом даты и времени и значениями переменной, и я пытаюсь сделать среднее значение данных каждые 5 секунд:

                                     VAR
2018-01-01 00:00:00                1945.054
2018-01-01 00:00:02                1944.940
2018-01-01 00:00:05                1945.061
2018-01-01 00:00:07                1945.255
2018-01-01 00:00:10                1945.007
2018-01-01 00:00:12                1944.995

Вот пример кода, который я написал, но он не работает:

library(xts)
library(doParallel)
library(foreach)

cores <- detectCores()
cluster <- makeCluster(cores, type = "PSOCK")
registerDoParallel(cluster)

ends <- endpoints(x,"secs",5)
m <- foreach(i = 1:length(index(x))) %dopar% period.apply(x,ends,mean)
index(m) <- foreach(m) %dopar% trunc(index(m),"secs")
stopCluster()

Вот код, который работает, но для гораздо большей базы данных это занимает слишком много времени:

ends <- endpoints(x,"secs",5)
m <- period.apply(x, ends, mean)
index(m) <- trunc(index(m),"secs")

Есть ли способ сделать это более эффективно?

Заранее спасибо.

Ответы [ 2 ]

0 голосов
/ 12 мая 2018

Я был действительно подавлен работой period.apply(), показанной в этом вопросе. Моя депрессия стала навязчивой идеей, чтобы сделать это быстрее. Поэтому я переписал его на C. Вот пример, который использует его и показывает улучшение производительности.

library(xts)  # need the GitHub development version
period_apply <- xts:::period_apply  # not exported

set.seed(21)
x <- .xts(rnorm(1e7), 1:1e7)
e <- endpoints(x, "seconds", 5)

system.time(y <- period.apply(x, e, sum))  # current version
#    user  system elapsed 
#  77.904   0.368  78.462 
system.time(z <- period_apply(x, e, sum))  # new C version
#    user  system elapsed 
#  15.468   0.232  15.741
all.equal(y, z)
# [1] TRUE

Так что в этом примере это примерно в 5 раз быстрее. Есть еще несколько вещей, которые могли бы сделать это еще быстрее, но 5x было хорошим местом, чтобы остановиться и показать, что это может быть лучше. Проверьте последнюю версию , если хотите (и достаточно смелы), чтобы попробовать.

0 голосов
/ 29 апреля 2018

Вы пробовали свой код на простом наборе данных? Потому что, как только я запустил его, он выполнял всю работу несколько раз (по одному разу для каждой строки в x). Кроме того, если вы пытаетесь распараллелить работу, обычно хорошей идеей является дать «работнику» как можно больше работы, прежде чем отправлять данные обратно. В вашем коде у вас есть два последовательных вызова foreach, которые приводят к дополнительным накладным расходам на связь.

Мой подход такой:

  1. Разделите объект xts на N джонки, убедившись, что мы разделяемся с одним из 5-секундных интервалов.
  2. Пусть каждый работник выполняет всю работу за один кусок.
  3. Объедините результаты. Как выбрать N?

Так как split.xts используется для первого шага, каждый блок будет иметь одинаковое количество интервалов 5 с. Однако объем выполняемой работы зависит (вероятно) в большей степени от количества точек данных, чем от количества интервалов 5 с. Поэтому, если распределение точек между этими чанками неравномерно, может иметь смысл использовать большее количество чанков вместе с некоторой балансировкой нагрузки. Если распределение точек равномерное, имеет смысл сделать N максимально большим, чтобы минимизировать накладные расходы на связь. Здесь я выбрал последний подход, то есть установил N равным количеству ядер.

Теперь давайте сгенерируем пример данных и применим ваше рабочее решение:

library(xts)
x <- xts(x = runif(100),
         order.by = as.POSIXct("2018-01-01") + 0:99)

ends <- endpoints(x,"secs",5)
m <- period.apply(x, ends, mean)
index(m) <- trunc(index(m),"secs")

Далее мы настраиваем параллельный кластер:

library(doParallel)
library(foreach)

cores <- detectCores()
cluster <- makeCluster(cores, type = "PSOCK")
registerDoParallel(cluster)

Теперь мы должны разделить объект xts. Здесь я сначала определяю временной интервал всего объекта и распределяю его по интервалам N 5 с.

N <- cores
k <- as.integer(ceiling(difftime(max(index(x)), min(index(x)), units = "secs") / (5 * N)))

Затем я разбил объект xts на список xts объектов, каждый из которых имеет примерно одинаковую длину:

split_x <- split(x, f = "secs", k = 5 * k)

Теперь я позволю foreach перебрать эти куски и объединить результаты:

m2 <- foreach(x = split_x, .packages = c("xts"), .combine = c) %dopar% {
    ends <- endpoints(x,"secs",5)
    m <- period.apply(x, ends, mean)
    index(m) <- trunc(index(m),"secs")
    m
}
stopCluster(cluster)

Ура, результаты равны:

all.equal(m, m2)
#> [1] TRUE
...