Моей первоначальной идеей было посмотреть на исходный код distHaversine
и скопировать его в функцию, которую я бы использовал с proxy
.Это будет работать так (обратите внимание, что lon
, как ожидается, будет первым столбцом):
library(geosphere)
library(dplyr)
library(proxy)
df1 <- data.frame(province = as.integer(c(1, 1, 1, 2, 2, 2)),
house = as.integer(c(1, 2, 3, 4, 5, 6)),
lat = c(-76.6, -76.5, -76.4, -75.4, -80.9, -85.7),
lon = c(39.2, 39.1, 39.3, 60.8, 53.3, 40.2))
custom_haversine <- function(x, y) {
toRad <- pi / 180
diff <- (y - x) * toRad
dLon <- diff[1L]
dLat <- diff[2L]
a <- sin(dLat / 2) ^ 2 + cos(x[2L] * toRad) * cos(y[2L] * toRad) * sin(dLon / 2) ^ 2
a <- min(a, 1)
# return
2 * atan2(sqrt(a), sqrt(1 - a)) * 6378137
}
pr_DB$set_entry(FUN=custom_haversine, names="haversine", loop=TRUE, distance=TRUE)
average_dist <- df1 %>%
select(-house) %>%
group_by(province) %>%
group_map(~ data.frame(avg=mean(proxy::dist(.x[ , c("lon", "lat")], method="haversine"))))
Однако, если вы ожидаете миллионы строк на провинцию, proxy
, вероятно, не будетвозможность выделить промежуточные (нижние треугольники) матрицы.Поэтому я перенес код на C ++ и добавил многопоточность в качестве бонуса:
EDIT : оказывается, что помощник s2d
был далек от оптимального, в этой версии теперь используются указанные формулы здесь .
EDIT2 : я только что узнал о RcppThread , и его можно использовать для обнаружения пользовательского прерывания.
// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::depends(RcppParallel,RcppThread)]]
#include <cstddef> // size_t
#include <math.h> // sin, cos, sqrt, atan2, pow
#include <vector>
#include <RcppThread.h>
#include <Rcpp.h>
#include <RcppParallel.h>
using namespace std;
using namespace Rcpp;
using namespace RcppParallel;
// single to double indices for lower triangular of matrices without diagonal
void s2d(const size_t id, const size_t nrow, size_t& i, size_t& j) {
j = nrow - 2 - static_cast<size_t>(sqrt(-8 * id + 4 * nrow * (nrow - 1) - 7) / 2 - 0.5);
i = id + j + 1 - nrow * (nrow - 1) / 2 + (nrow - j) * ((nrow - j) - 1) / 2;
}
class HaversineCalculator : public Worker
{
public:
HaversineCalculator(const NumericVector& lon,
const NumericVector& lat,
double& avg,
const int n)
: lon_(lon)
, lat_(lat)
, avg_(avg)
, n_(n)
, cos_lat_(lon.length())
{
// terms for distance calculation
for (size_t i = 0; i < cos_lat_.size(); i++) {
cos_lat_[i] = cos(lat_[i] * 3.1415926535897 / 180);
}
}
void operator()(size_t begin, size_t end) {
// for Kahan summation
double sum = 0;
double c = 0;
double to_rad = 3.1415926535897 / 180;
size_t i, j;
for (size_t ind = begin; ind < end; ind++) {
if (RcppThread::isInterrupted(ind % static_cast<int>(1e5) == 0)) return;
s2d(ind, lon_.length(), i, j);
// haversine distance
double d_lon = (lon_[j] - lon_[i]) * to_rad;
double d_lat = (lat_[j] - lat_[i]) * to_rad;
double d_hav = pow(sin(d_lat / 2), 2) + cos_lat_[i] * cos_lat_[j] * pow(sin(d_lon / 2), 2);
if (d_hav > 1) d_hav = 1;
d_hav = 2 * atan2(sqrt(d_hav), sqrt(1 - d_hav)) * 6378137;
// the average part
d_hav /= n_;
// Kahan sum step
double y = d_hav - c;
double t = sum + y;
c = (t - sum) - y;
sum = t;
}
mutex_.lock();
avg_ += sum;
mutex_.unlock();
}
private:
const RVector<double> lon_;
const RVector<double> lat_;
double& avg_;
const int n_;
tthread::mutex mutex_;
vector<double> cos_lat_;
};
// [[Rcpp::export]]
double avg_haversine(const DataFrame& input, const int nthreads) {
NumericVector lon = input["lon"];
NumericVector lat = input["lat"];
double avg = 0;
int size = lon.length() * (lon.length() - 1) / 2;
HaversineCalculator hc(lon, lat, avg, size);
int grain = size / nthreads / 10;
RcppParallel::parallelFor(0, size, hc, grain);
RcppThread::checkUserInterrupt();
return avg;
}
Этот код не будет выделять какую-либо промежуточную матрицу, он просто вычислит расстояние для каждой пары того, что должно быть нижней треугольной, и накапливает значения для среднего значения в конце.См. здесь для части суммирования Кахана.
Если вы сохраните этот код, скажем, haversine.cpp
, то вы можете сделать следующее:
library(dplyr)
library(Rcpp)
library(RcppParallel)
library(RcppThread)
sourceCpp("haversine.cpp")
df1 %>%
group_by(province) %>%
group_map(~ data.frame(avg=avg_haversine(.x, parallel::detectCores())))
# A tibble: 2 x 2
# Groups: province [2]
province avg
<int> <dbl>
1 1 15379.
2 2 793612.
Вотпроверка работоспособности тоже:
pr_DB$set_entry(FUN=geosphere::distHaversine, names="distHaversine", loop=TRUE, distance=TRUE)
df1 %>%
select(-house) %>%
group_by(province) %>%
group_map(~ data.frame(avg=mean(proxy::dist(.x[ , c("lon", "lat")], method="distHaversine"))))
Слово предостережения:
df <- data.frame(lon=runif(1e3, -90, 90), lat=runif(1e3, -90, 90))
system.time(proxy::dist(df, method="distHaversine"))
user system elapsed
34.353 0.005 34.394
system.time(proxy::dist(df, method="haversine"))
user system elapsed
0.789 0.020 0.809
system.time(avg_haversine(df, 4L))
user system elapsed
0.054 0.000 0.014
df <- data.frame(lon=runif(1e5, -90, 90), lat=runif(1e5, -90, 90))
system.time(avg_haversine(df, 4L))
user system elapsed
73.861 0.238 19.670
Вам, вероятно, придется подождать некоторое время, если у вас есть миллионы строк ...
Я должен также упомянуть, что невозможно обнаружить пользовательское прерывание в потоках, созданных с помощью RcppParallel
, поэтому, если вы начнете вычисление, вам следует либо дождаться его завершения, либо полностью перезапустить R / RStudio. См. EDIT2 выше.
Относительно сложности
В зависимости от ваших фактических данных и количества ядер, имеющихся на вашем компьютере, вы вполне можете закончить в ожидании завершения расчетов.Эта проблема имеет квадратичную сложность (так сказать, на провинцию).Эта строка:
int size = lon.length() * (lon.length() - 1) / 2;
обозначает количество вычислений расстояния (haversine), которое необходимо выполнить.Таким образом, если количество строк увеличивается в n
, то количество вычислений увеличивается, в общем, в n^2 / 2
.
Нет способа оптимизировать это;Вы не можете вычислить среднее число N
чисел без фактического вычисления каждого числа в первую очередь, и вам будет трудно найти что-то более быстрое, чем многопоточный код C ++, поэтому вам придется либо подождать, либо выброситьбольше ядер в проблеме, либо с одной машиной, либо с множеством машин, работающих вместе.В противном случае вы не сможете решить эту проблему.