Как записать непересекающиеся диапазоны (или временные интервалы) в R «data.table»? - PullRequest
0 голосов
/ 14 октября 2019

Моя цель состояла в том, чтобы определить уникальные непересекающиеся диапазоны (или временные интервалы) для каждого идентификатора, когда каждый идентификатор имел несколько, потенциально перекрывающихся диапазонов воздействия. Я обнаружил, что функция "flatten" из пакета R "IntervalSurgeon" может реализовать эту задачу. Мой вопрос: как эффективно реализовать ту же задачу и получить тот же вывод "tab_out" способом "data.table"?

library(data.table)
library(IntervalSurgeon)

set.seed(2019)

N <- 3 # number of IDs

IDs <- paste0("ID", 1:N) # unique IDs

K <- 4 # number of exposures per ID

DT <- data.table(IDs = rep(IDs, each = K), 
    starts = sample(1:20, N * K, replace = T))[,
    ends := starts + sample(1:5, N * K, replace = T)]


DT <- DT[order(IDs, starts),]

tab_out <- DT[, as.list(data.table(
    flatten(as.matrix(cbind(starts, ends))))), 
    by = IDs]

DT
    IDs starts ends
 1: ID1      7   11
 2: ID1     13   17
 3: ID1     15   16
 4: ID1     16   18
 5: ID2      1    5
 6: ID2      1    4
 7: ID2      2    3
 8: ID2     17   19
 9: ID3      3    6
10: ID3     13   16
11: ID3     14   15
12: ID3     16   21

tab_out
   IDs V1 V2
1: ID1  7 11
2: ID1 13 18
3: ID2  1  5
4: ID2 17 19
5: ID3  3  6
6: ID3 13 21

Ответы [ 3 ]

1 голос
/ 15 октября 2019

Заимствование идеи из решения Дэвида Ауренбурга здесь

DT[, g := c(0L, cumsum(shift(starts, -1L) > cummax(ends))[-.N]), IDs][,
    .(min(starts), max(ends)), .(g, IDs)]

вывод:

   g IDs V1 V2
1: 0 ID1  7 11
2: 1 ID1 13 18
3: 0 ID2  1  5
4: 1 ID2 17 19
5: 0 ID3  3  6
6: 1 ID3 13 21
0 голосов
/ 17 октября 2019

Ниже приведено время вычислений для подходов «IntervalSurgeon», «interval» и «data.table». Время указано для набора данных, содержащего один миллион идентификаторов, с 10 экспозициями на каждый идентификатор, то есть 10 миллионами строк. Я сделал один прогон из-за того, что подход «интервалов» занимает слишком много времени.

Машина была MacBook Pro (15 дюймов, 2018), с процессором Intel Core i9 2,9 ГГц, DDR4 2400 МГц 32 ГБ,память, работающая на MacOC Mojave v. 10.14.6, макс. 12 потоков.

Подход «data.table» был явным победителем, так же как подход «интервалов» был явным проигравшим:

ВРЕМЕННЫЕ ВЫЧИСЛЕНИЯ

"IntervalSurgeon way:"
   user  system elapsed 
469.296   6.528 473.200 

"intervals way:"
    user   system  elapsed 
2463.840    8.137 2476.685 

"data.table way:"
   user  system elapsed 
 22.125   0.133  21.249 

Интересно, что ни один из подходов не выиграл от установки большего числа потоков data.table с помощью setDTthreads (). Разделение набора данных на 100 равных частей и переход к подходу «data.table» через параллельный :: mclapply (mc.cores = 10) приводит к сокращению времени вычислений до 5 секунд (код также приведен ниже).

Комбинированный подход «data.table + parallel :: mclapply» также работал с намного большим набором данных из 50M ID (генерируется так же, как набор данных 1M ID, но с MM = 50). Этот набор данных содержит 500 миллионов строк, разделенных на 1000 подмножеств для mclapply. Я установил mc.cores = 4, чтобы не исчерпывать оперативную память. Вычислительное время одного прогона составило 451,485 сек., Что неплохо для ноутбука! Было бы здорово, если бы этот вид анализа однажды был включен в пакет data.table вместе с другими подобными типами интервальных анализов, как в пакете IntervalSurgeon .

GENERATINGDATASET

library(data.table)
library(fst)

rm(list = ls())
gc()

set.seed(2019)

# how many millions of IDs required?
MM <- 1 
N <- 1000000 * MM

# unique IDs
IDs <- paste0("ID", 1:N)

# number of exposures per ID
K <- 10 

ss <- sample(1:365, N * K, replace = T)
ff <- sample(1:365, N * K, replace = T)

ss_s <- pmin(ss, ff)
ff_s <- pmax(ss, ff)

DT <- data.table(IDs = rep(IDs, each = K), 
     starts = as.integer(ss_s), 
        ends =  as.integer(ff_s + 1))

DT[order(IDs, starts, ends),]

write_fst(DT, path = paste0("fst_DT_", MM, "Mx3.fst"))

DT2
                IDs starts ends
       1:      ID1      4   22
       2:      ID1     16  233
       3:      ID1     19  224
       4:      ID1     31  227
       5:      ID1     38  147
      ---                     
 9999996: ID999999    152  310
 9999997: ID999999    160  218
 9999998: ID999999    180  272
 9999999: ID999999    215  332
10000000: ID999999    260  265

КОД ТРИ ПОДХОДА

library(data.table)
library(IntervalSurgeon)
library(intervals)
library(fst)

rm(list = ls())
gc()

threads_no <- 2L
setDTthreads(threads_no)
print(getDTthreads())

# read the dataset generated above
#####################
DT <- read_fst("fst_DT_1Mx3.fst", as.data.table = T)

print("dataset dimentions:")
print(dim(DT))
#####################

# "intervals" function
#####################
myfun <- function( y ) {
  data.table::as.data.table( 
    intervals::interval_union(
      intervals::Intervals( as.matrix( y ) ), check_valid = TRUE ) 
    )
}
#####################

# 1) "IntervalSurgeon" way
#####################
ptm1 <- proc.time()

tab_out1 <- DT[, as.list(data.table(
    flatten(as.matrix(cbind(starts, ends))))), 
        by = IDs]

exec_time1 <- proc.time() - ptm1
print("IntervalSurgeon way:")
print(exec_time1)
#####################       

# 2) "intervals" way  
##################### 
ptm2 <- proc.time()

tab_out2 <- DT[, myfun( .SD ), by = .(IDs)][,
    c("V1", "V2") := lapply(.SD, as.integer), .SDcols = c("V1", "V2")]

exec_time2 <- proc.time() - ptm2
print("intervals way:")
print(exec_time2)
##################### 

# 3) "data.table" way
#####################
ptm3 <- proc.time()

tab_out3 <- DT[, g := c(0L, cumsum(shift(starts, -1L) > cummax(ends))[-.N]), IDs][,
    .(min(starts), max(ends)), .(g, IDs)][,g := NULL]

exec_time3 <- proc.time() - ptm3
print("data.table way:")
print(exec_time3)
#####################  

print(identical(tab_out1, tab_out2))
print(identical(tab_out2, tab_out3))

"data.table + parallel :: mclapply" КОМБИНАЦИОННЫЙ ПОДХОД

library(data.table)
library(IntervalSurgeon)
library(intervals)
library(fst)
library(parallel)

rm(list = ls())
gc()

mc_cores <- 10

threads_no <- 2L
setDTthreads(threads_no)
print(getDTthreads())

DT <- read_fst("fst_DT_1Mx3.fst", as.data.table = T)

# split the dataset into G equal parts
##################### 
G <- 100
nn <- nrow(DT)
step_c <- nn/G

# collect the indexes in the list
list_inx <- vector('list', G)

ii_low <- 1
ii_up <- step_c
for (i2 in 1:G) {

    list_inx[[i2]] <- ii_low:ii_up 
    ii_low <- ii_up+1
    ii_up <- step_c*(i2+1)

    }
##################### 

ptm3 <- proc.time()

# approach implementation
#####################
list_out <- mclapply(1:G, function(i) {
            DT_tmp <- DT[list_inx[[i]],]
            return(DT_tmp[, g := c(0L, cumsum(shift(starts, -1L) > cummax(ends))[-.N]), IDs][,
                .(min(starts), max(ends)), .(g, IDs)][,g := NULL])
                },
                mc.cores = mc_cores
                )
#####################

exec_time3 <- proc.time() - ptm3
print("data.table + parallel::mclapply:")
print(exec_time3)

ИНФОРМАЦИЯ О СЕССИИ

sessionInfo()
R version 3.5.2 (2018-12-20)
Platform: x86_64-apple-darwin15.6.0 (64-bit)
Running under: macOS Mojave 10.14.6

Matrix products: default
BLAS: /Library/Frameworks/R.framework/Versions/3.5/Resources/lib/libRblas.0.dylib
LAPACK: /Library/Frameworks/R.framework/Versions/3.5/Resources/lib/libRlapack.dylib

locale:
[1] en_AU.UTF-8/en_AU.UTF-8/en_AU.UTF-8/C/en_AU.UTF-8/en_AU.UTF-8

attached base packages:
[1] parallel  stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] intervals_0.15.1    IntervalSurgeon_1.0 fst_0.9.0           data.table_1.12.2  

loaded via a namespace (and not attached):
[1] compiler_3.5.2 Rcpp_1.0.1 
0 голосов
/ 14 октября 2019

Вот решение с использованием пакета intervals.

пример данных

library( data.table )
library( intervals )

DT <- fread("
IDs starts ends
ID1      7   11
ID1     13   17
ID1     15   16
ID1     16   18
ID2      1    5
ID2      1    4
ID2      2    3
ID2     17   19
ID3      3    6
ID3     13   16
ID3     14   15
ID3     16   21")

код

myfun <- function( y ) {
  data.table::as.data.table( 
    intervals::interval_union(
      intervals::Intervals( as.matrix( y ) ), check_valid = TRUE ) 
    )
}

DT[, myfun( .SD ), by = .(IDs)]

#    IDs V1 V2
# 1: ID1  7 11
# 2: ID1 13 18
# 3: ID2  1  5
# 4: ID2 17 19
# 5: ID3  3  6
# 6: ID3 13 21
...