Я тестирую пакет RcppParallel, чтобы вычислить внутренние продукты для данных на диске (доступ к которым осуществляется через отображение памяти - аналогично большой памяти пакета).
Пример "минимального" воспроизведения:
// [[Rcpp::depends(RcppParallel, BH, bigstatsr)]]
#include <bigstatsr/BMCodeAcc.h>
#include <RcppParallel.h>
using namespace RcppParallel;
struct Sum : public Worker {
SubBMCode256Acc macc;
double xySum;
std::size_t j0, j;
// constructors
Sum(SubBMCode256Acc macc) :
macc(macc), xySum(0), j0(0), j(0) {}
Sum(const Sum& sum, std::size_t j0, std::size_t j) :
macc(sum.macc), xySum(0), j0(j0), j(j) {}
Sum(const Sum& sum, Split) :
macc(sum.macc), xySum(0), j0(sum.j0), j(sum.j) {}
// accumulate just the element of the range I've been asked to
void operator()(std::size_t begin, std::size_t end) {
for (std::size_t i = begin; i < end; i++) {
xySum += macc(i, j) * macc(i, j0);
}
}
// join results
void join(const Sum& rhs) {
xySum += rhs.xySum;
}
};
// [[Rcpp::export]]
NumericVector parallelVectorSum(Environment BM) {
XPtr<FBM> xpBM = BM["address"];
std::size_t n = xpBM->nrow();
std::size_t m = xpBM->ncol();
SubBMCode256Acc macc(xpBM, seq_len(n) - 1, seq_len(m) - 1, BM["code256"]);
int grain = std::sqrt(n);
Sum sum0(macc);
NumericVector res(m);
for (size_t j = 0; j < m; j++) {
Sum sum(sum0, 0, j);
parallelReduce(0, n, sum, grain);
res[j] = sum.xySum;
}
return res;
}
/*** R
RcppParallel::setThreadOptions(2)
library(bigsnpr)
snp <- snp_attachExtdata()
G <- snp$genotypes
test0 <- parallelVectorSum(G)
G2 <- big_copy(G, ind.row = rep(rows_along(G), 500))
dim(G2)
RcppParallel::setThreadOptions(1)
system.time(test1 <- parallelVectorSum(G2))
testthat::expect_identical(test1, 500 * test0)
RcppParallel::setThreadOptions(2)
system.time(test2 <- parallelVectorSum(G2))
testthat::expect_identical(test2, 500 * test0)
*/
Вывод:
> Rcpp::sourceCpp('tmp-tests/test-rcpp-parallel.cpp')
> RcppParallel::setThreadOptions(2)
> library(bigsnpr)
> snp <- snp_attachExtdata()
> G <- snp$genotypes
> test0 <- parallelVectorSum(G)
> G2 <- big_copy(G, ind.row = rep(rows_along(G), 500))
> dim(G2)
[1] 258500 4542
> RcppParallel::setThreadOptions(1)
> system.time(test1 <- parallelVectorSum(G2)) # 100 / 3
user system elapsed
3.621 0.423 4.045
> testthat::expect_identical(test1, 500 * test0)
> RcppParallel::setThreadOptions(2)
> system.time(test2 <- parallelVectorSum(G2)) # 177 / 39
user system elapsed
39.958 42.590 53.516
> testthat::expect_identical(test2, 500 * test0)
Использование одного потока занимает 4 секунды, а 53 секунды - 2 потока. Я немного растерялся из-за того, что могло быть причиной такой большой разницы.Любая идея ??
PS1: я запускал это на двух разных компьютерах (другие процессы не запущены).
PS2: я знаю, что вместо этого, вероятно, следует распараллелить j
,Я проверил это;это работает хорошо.Тем не менее, в реальной проблеме, которую я имею, итерации по j
не являются независимыми, так что было бы намного проще распараллелить по i
.