Самое быстрое (распараллеленное) объединение больших наборов данных по ближайшим координатам? - PullRequest
1 голос
/ 28 марта 2019

Я выполняю объединение в стиле SQL двух больших координатных наборов данных фиксированного размера (широта, длина) путем поиска ближайшего соседа.В настоящее время я использую dplyr и data.table для этого.Как я могу оптимизировать и распараллелить мой код для абсолютного времени выполнения?

Предыдущие попытки включали в себя собственный python, pandas и многопроцессорную обработку, которая оказалась очень медленной.Мое текущее решение, использующее data.table для построения таблицы ближайших соседей и dplyr для объединения на основе этой таблицы, является самым быстрым, но все еще слишком медленным.

library(dplyr)
library(data.table)
library(geosphere)

source <- data.table(lat = runif(1e3), long = runif(1e3)) %>% mutate(nrow = row_number())
dest <- data.table(lat = runif(5e4), long = runif(5e4)) %>% mutate(ind = row_number())
dest_mat <- as.matrix(dest[, c('long', 'lat')])
setDT(source)
# function that returns the index of the closest matching point in dest
mindist_ind <- function(source_lat, source_long) { return(which.min(distHaversine(c(source_long, source_lat), dest_mat))) }


nn_inds <- source[, j = list(ind = mindist_ind(lat, long)), by = 1:nrow(source)] # slowest line, gets index of nearest match in dest
nn_matches <- inner_join(nn_inds, dest, by = 'ind') # join final back to dest in order to get all nearest matches
sourcedest_matches <- inner_join(source, nn_matches, by = 'nrow') # join nearest matches to source by index

Исходный файл содержит ~ 89 миллионов строк, а dest - приблизительно ~ 50 000 строк.Текущие временные параметры для источников разного размера следующие:

  • 1000 строк -> 46 секунд
  • 10000 строк -> 270 секунд
  • 100000 строк -> 2580 секунд
  • 1000000 строк -> 17172 секунд

Несмотря на то, что это самый быстрый способ, который мне удалось получить, для полного исходного файла на 89 миллионов это займет около 17-18 дней.бегать слишком долго.Я использую его на экземпляре AWS EC2 r4.16xlarge, с 488 ГБ ОЗУ, 32 ядрами и 64 виртуальными ЦП.Как я могу оптимизировать / распараллелить этот код для более быстрого запуска?

1 Ответ

1 голос
/ 29 марта 2019

Я предполагаю, что код, который вы указали в своем вопросе, на самом деле не тот, который вы хотите.Ваш код вычисляет расстояние между попарно рядами source и dest, перерабатывая source, чтобы соответствовать длине dest.

То, что вы, вероятно, хотите, и что дает этот ответ, это найтиближайшая точка в dest для каждой точки в source.(см. мои комментарии к вашему вопросу)

Расчет матриц расстояний требует больших вычислительных ресурсов.Предполагая, что R-пакеты примерно одинаково эффективны при расчете матриц расстояний, на самом деле единственный способ ускорить это - распараллелить вычисление матриц расстояний.К сожалению, матрица с большим количеством строк является опорными точками, потому что распараллеливание может происходить только по подмножествам исходных точек.(т. е. вам нужно учесть все dest точки, чтобы найти ближайшую dest точку к любой данной source)

library(parallel)
library(sp)
#nonparallel version
x2 <- copy(source)
temp <- spDists(x2[, .(long,lat)],dest_mat,longlat=TRUE)
system.time(final2 <- x2[, c("long_dest","lat_dest"):=as.list(dest_mat[apply(temp,1,which.min),]) ])

#parallel version

source_l <- split(source, rep(1:10,each=100))

cl <- makeCluster(getOption("cl.cores", 4))
clusterExport(cl, "dest_mat") #this is slow but I don't think there's a way to get around this

system.time(
  v <- parLapply(cl=cl, X=source_l, fun=function(x){
    library(sp)
    library(data.table)
    temp <- spDists(x[, .(long,lat)],dest_mat,longlat=TRUE)
    x[, c("long_dest","lat_dest"):=as.list(dest_mat[apply(temp,1,which.min),]) ]
    x
  })
)

stopCluster(cl)

final <- rbindlist(v)
identical(final[order(nrow)],final2)

Вам нужно будет поэкспериментировать с использованием более 32 процессовна самом деле ускоряет вещи.Гиперпоточность может быть смешанной, и не всегда легко предсказать, принесет ли она какую-либо пользу.К сожалению, нет гарантии, что у вас будет достаточно оперативной памяти для запуска оптимального количества процессов.Это не только медленно, но и интенсивно использует память.Если вы получаете ошибки, указывающие на то, что у вас недостаточно памяти, вам нужно уменьшить количество процессов или арендовать машину EC2 с большим объемом памяти.

Наконец, я отмечу, что which.min возвращаетИндекс первых минимумов, если есть связи.Таким образом, результаты будут зависеть от порядка строк в dest_mat.

...