Выполнить обнаружение точки останова (лм) параллельно в R - PullRequest
0 голосов
/ 07 октября 2018

Я делаю около 80000 вычислений обнаружения точек останова во временных рядах в R. У меня есть все эти чрезвычайно разные временные ряды, где я не могу применять модели ARIMA, поэтому я рассчитываю линейную модель для временных рядов, затем извлекаю точки останова и использую подобранные результаты.регрессии для расчета тренда, приходящего с последней точки останова.Example of one time series

В приведенном выше примере алгоритм обнаружит три точки останова (один наклон, один довольно плоский и один спад).Это идеально подходит для моих нужд, но выполнение вычислений 80k точек останова один раз в неделю - это слишком много, поэтому я пытаюсь реализовать это, используя параллельную обработку в R.

В этом примере (найти ссылку на данныениже) Я последовательно вычисляю точки останова, что занимает около 24 часов для всех 88 тыс.

df.subset <- read.csv("dfsubset.csv)"
start <- Sys.time()

All.Breakpoints <- df.subset %>%
nest(-CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))

Sys.time() - start

В этом фрагменте кода я запускаю обнаружение для 10 временных рядов (на моем Mac), которое принимает 47 секунд .Я предполагаю, что распараллеливание должно сократить это время теста примерно до 1/4 времени.

Ниже я перечислил три способа, которыми я пытался распараллелить вычисления, но не могу получить вложенное применение для работы в параллельной настройке.

С параллельным пакетом

clus <- makeCluster(4)
clusterEvalQ(clus, {library(dplyr); library(tidyr); library(magrittr)})

myfunction <- function(df.subset) {
All.Breakpoints <- df.subset %>%
nest(-CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))
return(All.Breakpoints)
}

clusterExport(clus, "myfunction")

do.call(bind_rows, parApply(clus, df.subset, 1,{function(r) { 
myfunction(r[1]) }}))

С пакетом multidplyr:

library(multidplyr)
cluster <- create_cluster(4)
set_default_cluster(cluster)

four <- function(x) {
All.Breakpoints <- x %>%
nest(-CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .)))
return(All.Breakpoints)
}

cluster_assign_value(cluster, 'four', four)
save <- df.subset %>% partition(CONC_ID) %>% map(four(.))

С параллельным пакетом, но с другой группировкой

library(parallel)
cl <- detectCores()

group <- df.subset %>% group_by(CONC_ID) %>% group_indices
df.subset <- bind_cols(tibble(group), df.subset)

cluster <- create_cluster(cores = cl)

by_group <- df.subset %>%
partition(group, cluster = cluster)

by_group %>%
# Assign libraries
cluster_library("tidyr") %>%
cluster_library("dplyr") %>%
cluster_library("strucchange") %>%
cluster_library("purrr") %>%
# Assign values (use this to load functions or data to each core)
cluster_assign_value("df.subset", df.subset) 

cluster_eval(by_group, search())[[1]] # results for first cluster shown 
only
cluster_get(by_group, "df.subset")[[1]]

start <- proc.time() # Start clock
sp_500_processed_in_parallel <- by_group %>% # Use by_group party_df
group_by(CONC_ID) %>%
mutate(bps = map(data, ~breakpoints(ACT_QTY_new ~ Index, data = .))) 
%>%
collect() %>% # Special collect() function to recombine partitions
as_tibble()   # Convert to tibble
time_elapsed_parallel <- proc.time() - start # End clock
time_elapsed_parallel

Ссылка на файл:

http://www.filedropper.com/dfsubset

Я ценю ваши идеи и отзывы!

1 Ответ

0 голосов
/ 12 октября 2018

Задавая вопрос и описывая проблему, вы решите его большую часть времени ... Я понял, что mutate не работает нигде (, честно говоря, Stackoverflow ) параллельно в R.

Поэтому я перешел на использование do и распределил нагрузку через multidplyr и получил уменьшение времени вычислений примерно на 50% при переходе от 1 до 4 ядер и до 25% от общего времени при переходе от 1 до 8 ядер.

Код ниже.

## parallel
cl <- detectCores()
cl

df.cluster <- df.subset

cluster <- create_cluster(cores = cl)
cluster

by_group <- df.cluster %>%
partition(CONC_ID, cluster = cluster)
by_group

by_group %>%

# Assign libraries
cluster_library("strucchange")
cluster_eval(by_group, search())[[1]] # results for first cluster shown only

start <- proc.time() # Start clock

cluster.processed <- by_group %>%
                     do(model = breakpoints(ACT_QTY_new ~ Index, data = .)) %>%
                     collect()

time_elapsed_parallel <- proc.time() - start # End clock
time_elapsed_parallel

rm(by_grou)
gc()

Predictions <- cluster.processed %>%
mutate(SegmentedForecast = map(model, fitted))
df.fitted.vector <- as.data.frame(rowwise(Predictions[,3])) . 
...