Версия RcppParallel намного быстрее при использовании только одного потока - PullRequest
0 голосов
/ 25 ноября 2018

Я тестирую пакет RcppParallel, чтобы вычислить внутренние продукты для данных на диске (доступ к которым осуществляется через отображение памяти - аналогично большой памяти пакета).

Пример "минимального" воспроизведения:

// [[Rcpp::depends(RcppParallel, BH, bigstatsr)]]
#include <bigstatsr/BMCodeAcc.h>
#include <RcppParallel.h>
using namespace RcppParallel;

struct Sum : public Worker {

  SubBMCode256Acc macc;
  double xySum;
  std::size_t j0, j;

  // constructors
  Sum(SubBMCode256Acc macc) :
    macc(macc),     xySum(0), j0(0), j(0) {}
  Sum(const Sum& sum, std::size_t j0, std::size_t j) :
    macc(sum.macc), xySum(0), j0(j0), j(j) {}
  Sum(const Sum& sum, Split) :
    macc(sum.macc), xySum(0), j0(sum.j0), j(sum.j) {}

  // accumulate just the element of the range I've been asked to
  void operator()(std::size_t begin, std::size_t end) {
    for (std::size_t i = begin; i < end; i++) {
      xySum += macc(i, j) * macc(i, j0);
    }
  }
  // join results
  void join(const Sum& rhs) {
    xySum += rhs.xySum;
  }
};

// [[Rcpp::export]]
NumericVector parallelVectorSum(Environment BM) {

  XPtr<FBM> xpBM = BM["address"];
  std::size_t n = xpBM->nrow();
  std::size_t m = xpBM->ncol();
  SubBMCode256Acc macc(xpBM, seq_len(n) - 1, seq_len(m) - 1, BM["code256"]);

  int grain = std::sqrt(n);

  Sum sum0(macc);
  NumericVector res(m);
  for (size_t j = 0; j < m; j++) {
    Sum sum(sum0, 0, j);
    parallelReduce(0, n, sum, grain);
    res[j] = sum.xySum;
  }

  return res;
}


/*** R
RcppParallel::setThreadOptions(2)
library(bigsnpr)
snp <- snp_attachExtdata()
G <- snp$genotypes
test0 <- parallelVectorSum(G)

G2 <- big_copy(G, ind.row = rep(rows_along(G), 500))
dim(G2)
RcppParallel::setThreadOptions(1)
system.time(test1 <- parallelVectorSum(G2))
testthat::expect_identical(test1, 500 * test0)
RcppParallel::setThreadOptions(2)
system.time(test2 <- parallelVectorSum(G2))
testthat::expect_identical(test2, 500 * test0)
*/

Вывод:

> Rcpp::sourceCpp('tmp-tests/test-rcpp-parallel.cpp')

> RcppParallel::setThreadOptions(2)

> library(bigsnpr)

> snp <- snp_attachExtdata()

> G <- snp$genotypes

> test0 <- parallelVectorSum(G)

> G2 <- big_copy(G, ind.row = rep(rows_along(G), 500))

> dim(G2)
[1] 258500   4542

> RcppParallel::setThreadOptions(1)

> system.time(test1 <- parallelVectorSum(G2))  # 100 / 3
   user  system elapsed 
  3.621   0.423   4.045 

> testthat::expect_identical(test1, 500 * test0)

> RcppParallel::setThreadOptions(2)

> system.time(test2 <- parallelVectorSum(G2))  # 177 / 39
   user  system elapsed 
 39.958  42.590  53.516 

> testthat::expect_identical(test2, 500 * test0)

Использование одного потока занимает 4 секунды, а 53 секунды - 2 потока. Я немного растерялся из-за того, что могло быть причиной такой большой разницы.Любая идея ??

PS1: я запускал это на двух разных компьютерах (другие процессы не запущены).

PS2: я знаю, что вместо этого, вероятно, следует распараллелить j,Я проверил это;это работает хорошо.Тем не менее, в реальной проблеме, которую я имею, итерации по j не являются независимыми, так что было бы намного проще распараллелить по i.

...