Ниже приведено время вычислений для подходов «IntervalSurgeon», «interval» и «data.table». Время указано для набора данных, содержащего один миллион идентификаторов, с 10 экспозициями на каждый идентификатор, то есть 10 миллионами строк. Я сделал один прогон из-за того, что подход «интервалов» занимает слишком много времени.
Машина была MacBook Pro (15 дюймов, 2018), с процессором Intel Core i9 2,9 ГГц, DDR4 2400 МГц 32 ГБ,память, работающая на MacOC Mojave v. 10.14.6, макс. 12 потоков.
Подход «data.table» был явным победителем, так же как подход «интервалов» был явным проигравшим:
ВРЕМЕННЫЕ ВЫЧИСЛЕНИЯ
"IntervalSurgeon way:"
user system elapsed
469.296 6.528 473.200
"intervals way:"
user system elapsed
2463.840 8.137 2476.685
"data.table way:"
user system elapsed
22.125 0.133 21.249
Интересно, что ни один из подходов не выиграл от установки большего числа потоков data.table с помощью setDTthreads (). Разделение набора данных на 100 равных частей и переход к подходу «data.table» через параллельный :: mclapply (mc.cores = 10) приводит к сокращению времени вычислений до 5 секунд (код также приведен ниже).
Комбинированный подход «data.table + parallel :: mclapply» также работал с намного большим набором данных из 50M ID (генерируется так же, как набор данных 1M ID, но с MM = 50). Этот набор данных содержит 500 миллионов строк, разделенных на 1000 подмножеств для mclapply. Я установил mc.cores = 4, чтобы не исчерпывать оперативную память. Вычислительное время одного прогона составило 451,485 сек., Что неплохо для ноутбука! Было бы здорово, если бы этот вид анализа однажды был включен в пакет data.table вместе с другими подобными типами интервальных анализов, как в пакете IntervalSurgeon .
GENERATINGDATASET
library(data.table)
library(fst)
rm(list = ls())
gc()
set.seed(2019)
# how many millions of IDs required?
MM <- 1
N <- 1000000 * MM
# unique IDs
IDs <- paste0("ID", 1:N)
# number of exposures per ID
K <- 10
ss <- sample(1:365, N * K, replace = T)
ff <- sample(1:365, N * K, replace = T)
ss_s <- pmin(ss, ff)
ff_s <- pmax(ss, ff)
DT <- data.table(IDs = rep(IDs, each = K),
starts = as.integer(ss_s),
ends = as.integer(ff_s + 1))
DT[order(IDs, starts, ends),]
write_fst(DT, path = paste0("fst_DT_", MM, "Mx3.fst"))
DT2
IDs starts ends
1: ID1 4 22
2: ID1 16 233
3: ID1 19 224
4: ID1 31 227
5: ID1 38 147
---
9999996: ID999999 152 310
9999997: ID999999 160 218
9999998: ID999999 180 272
9999999: ID999999 215 332
10000000: ID999999 260 265
КОД ТРИ ПОДХОДА
library(data.table)
library(IntervalSurgeon)
library(intervals)
library(fst)
rm(list = ls())
gc()
threads_no <- 2L
setDTthreads(threads_no)
print(getDTthreads())
# read the dataset generated above
#####################
DT <- read_fst("fst_DT_1Mx3.fst", as.data.table = T)
print("dataset dimentions:")
print(dim(DT))
#####################
# "intervals" function
#####################
myfun <- function( y ) {
data.table::as.data.table(
intervals::interval_union(
intervals::Intervals( as.matrix( y ) ), check_valid = TRUE )
)
}
#####################
# 1) "IntervalSurgeon" way
#####################
ptm1 <- proc.time()
tab_out1 <- DT[, as.list(data.table(
flatten(as.matrix(cbind(starts, ends))))),
by = IDs]
exec_time1 <- proc.time() - ptm1
print("IntervalSurgeon way:")
print(exec_time1)
#####################
# 2) "intervals" way
#####################
ptm2 <- proc.time()
tab_out2 <- DT[, myfun( .SD ), by = .(IDs)][,
c("V1", "V2") := lapply(.SD, as.integer), .SDcols = c("V1", "V2")]
exec_time2 <- proc.time() - ptm2
print("intervals way:")
print(exec_time2)
#####################
# 3) "data.table" way
#####################
ptm3 <- proc.time()
tab_out3 <- DT[, g := c(0L, cumsum(shift(starts, -1L) > cummax(ends))[-.N]), IDs][,
.(min(starts), max(ends)), .(g, IDs)][,g := NULL]
exec_time3 <- proc.time() - ptm3
print("data.table way:")
print(exec_time3)
#####################
print(identical(tab_out1, tab_out2))
print(identical(tab_out2, tab_out3))
"data.table + parallel :: mclapply" КОМБИНАЦИОННЫЙ ПОДХОД
library(data.table)
library(IntervalSurgeon)
library(intervals)
library(fst)
library(parallel)
rm(list = ls())
gc()
mc_cores <- 10
threads_no <- 2L
setDTthreads(threads_no)
print(getDTthreads())
DT <- read_fst("fst_DT_1Mx3.fst", as.data.table = T)
# split the dataset into G equal parts
#####################
G <- 100
nn <- nrow(DT)
step_c <- nn/G
# collect the indexes in the list
list_inx <- vector('list', G)
ii_low <- 1
ii_up <- step_c
for (i2 in 1:G) {
list_inx[[i2]] <- ii_low:ii_up
ii_low <- ii_up+1
ii_up <- step_c*(i2+1)
}
#####################
ptm3 <- proc.time()
# approach implementation
#####################
list_out <- mclapply(1:G, function(i) {
DT_tmp <- DT[list_inx[[i]],]
return(DT_tmp[, g := c(0L, cumsum(shift(starts, -1L) > cummax(ends))[-.N]), IDs][,
.(min(starts), max(ends)), .(g, IDs)][,g := NULL])
},
mc.cores = mc_cores
)
#####################
exec_time3 <- proc.time() - ptm3
print("data.table + parallel::mclapply:")
print(exec_time3)
ИНФОРМАЦИЯ О СЕССИИ
sessionInfo()
R version 3.5.2 (2018-12-20)
Platform: x86_64-apple-darwin15.6.0 (64-bit)
Running under: macOS Mojave 10.14.6
Matrix products: default
BLAS: /Library/Frameworks/R.framework/Versions/3.5/Resources/lib/libRblas.0.dylib
LAPACK: /Library/Frameworks/R.framework/Versions/3.5/Resources/lib/libRlapack.dylib
locale:
[1] en_AU.UTF-8/en_AU.UTF-8/en_AU.UTF-8/C/en_AU.UTF-8/en_AU.UTF-8
attached base packages:
[1] parallel stats graphics grDevices utils datasets methods base
other attached packages:
[1] intervals_0.15.1 IntervalSurgeon_1.0 fst_0.9.0 data.table_1.12.2
loaded via a namespace (and not attached):
[1] compiler_3.5.2 Rcpp_1.0.1