Как я могу создать указатели на матрицу RcppEigen, которую я могу использовать с std :: nth_element и openMP? - PullRequest
0 голосов
/ 25 августа 2018

Я пытаюсь реализовать функцию в Rcpp, которая принимает матрицу в качестве входных данных и вычисляет и квантили, как указано пользователем для строки указанной матрицы. Поскольку я хочу использовать openMP, я попытался сделать это с помощью RcppEigen из-за проблем безопасности потоков. Одна из причин, по которой это выглядит несколько сложным, заключается в том, что для эффективного расчета квантилей я попытался имитировать этот подход ( поиск квартилей , первый ответ), но учел ввод данных пользователем. По сути, я создаю вектор с индексами, соответствующими квантилям на первом шаге. На втором этапе я пытаюсь получить доступ к соответствующим значениям в цикле for.

Это код, который я пытался:

// // -*- mode: C++; c-indent-level: 4; c-basic-offset: 4; indent-tabs-mode: nil; -*-

// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>
// [[Rcpp::plugins(openmp)]]

#ifdef _OPENMP
#include <omp.h>
#endif

// [[Rcpp::plugins(cpp11)]]
#include <random>

// [[Rcpp::export]]
SEXP summaryParC(const Eigen::MatrixXd x,
                 const Eigen::VectorXd quantiles,
                 int nrow, int ncol, const int ncores)
{
  const int no_quantiles = quantiles.size();
  Eigen::MatrixXd result(nrow, no_quantiles);

  // this part is just to give me a vector of indices I need later on in the foor loop
  //-----------------------------------------------
  Eigen::VectorXi indices(no_quantiles +1);
  indices[0] = -1;
  for (int k=0; k<no_quantiles; k++){
    if (quantiles[k] < 0.5){
      indices[k+1] = floor(quantiles[k] * (ncol-1));
    } else {
      indices[k+1] = ceil(quantiles[k] * (ncol-1));
    }
  }
  //-----------------------------------------------

#pragma omp parallel num_threads(ncores)
{
#pragma omp for
  for(int i = 0; i < nrow; i++){
    // I am trying to convert it into a vector so I can sort it
    Eigen::VectorXd v = (x.row(i));
    auto * ptr = v; // this fails
    // here I want to use the pointer to access the n-th element of the vector
    for(int q=0; q<no_quantiles; q++){ //quantiles
      std::nth_element(ptr + indices[q] + 1, ptr + indices[q+1], ptr + ncol);
      result(i,q) = *(ptr + indices[q+1]);
    }
  }
}
return Rcpp::wrap(result);
}

Причина, по которой я хотел определить свой собственный указатель, заключается в том, что Eigen :: VectorXd v не имеет ничего общего с v.begin (). без openMP я бы просто определил x как NumericMatrix, а v как NumericVector, и все работает отлично. Используя openMP, я не могу полагаться на то, что он поточно-ориентированный?

Это работает для небольших наборов данных, но вылетает при использовании на большей матрице:

// [[Rcpp::export]]
SEXP summaryC(NumericMatrix x,
                 NumericVector quantiles, 
                 int nrow, int ncol, const int ncores)
{
  const int no_quantiles = quantiles.size();
  NumericMatrix result(nrow, no_quantiles);
  int indices[no_quantiles +1];
  //-----------------------------------------------
  indices[0] = -1;
  for (int k=0; k<no_quantiles; k++){
    if (quantiles[k] < 0.5){
      indices[k+1] = floor(quantiles[k] * (ncol-1));
    } else {
      indices[k+1] = ceil(quantiles[k] * (ncol-1));
    }
  }
  //-----------------------------------------------
#pragma omp parallel num_threads(ncores)
{
#pragma omp for
  for(int i = 0; i < nrow; i++){
    // converting it into a vector so I can sort it
    NumericVector v = (x.row(i));
    for(int q=0; q<no_quantiles; q++){ //quantiles
      std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
      result(i,q) = *(v.begin() + indices[q+1]);
    }
  }
}
  return Rcpp::wrap(result);
}

Большое спасибо!

Обновление

Я реализовал подход Ральфа Стубнера. Указатель работает отлично, насколько я могу судить. (К сожалению, R все еще прерывает сеанс, когда я пытаюсь его запустить. Как отметил Дирк Эддельбюттель, использование указателя не решает проблему доступа к R-памяти).

// [[Rcpp::export]]
SEXP summaryParC(Eigen::MatrixXd x,
                 const Eigen::VectorXd quantiles,
                 int nrow, int ncol, const int ncores)
{
  const int no_quantiles = quantiles.size();
  Eigen::MatrixXd result(nrow, no_quantiles);
  Eigen::VectorXi indices(no_quantiles +1);
  indices[0] = -1;
  for (int k=0; k<no_quantiles; k++){
    if (quantiles[k] < 0.5){
      indices[k+1] = floor(quantiles[k] * (ncol-1));
    } else {
      indices[k+1] = ceil(quantiles[k] * (ncol-1));
    }
  }

#pragma omp parallel num_threads(ncores)
{
#pragma omp for
  for(int i = 0; i < nrow; i++){
    Eigen::VectorXd v = (x.row(i));
    double * B = v.data();
    double * E = B + nrow;

    for(int q=0; q<no_quantiles; q++){ //quantiles
      std::nth_element(B + indices[q] + 1, B + indices[q+1], E);
      result(i,q) = *(B + indices[q+1]);
    }
  }
}
return Rcpp::wrap(result);
}

2-е обновление : здесь более понятный пример основной проблемы. Я осознаю тот факт, что использование R-структур проблематично с openMP, но, возможно, этот пример может привести к лучшему пониманию основных причин.

// [[Rcpp::plugins(openmp)]]
// [[Rcpp::plugins(cpp11)]]
#include <Rcpp.h>
#ifdef _OPENMP
#include <omp.h>
#endif

using namespace Rcpp;

// [[Rcpp::export]]
SEXP summaryC(NumericMatrix x,
              int nrow, int ncol, const int ncores)
{
  NumericMatrix result(nrow, 5);
  int indices[6] = {-1, 0,  249,  500,  750, 999};

  //   #pragma omp parallel num_threads(ncores)
  {
    //     #pragma omp for
    for(int i = 0; i < nrow; i++){
      NumericVector v = (x.row(i));
      for(int q=0; q < 5; q++){
        std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
        result(i,q) = *(v.begin() + indices[q+1]);
      }
    }
  }
  return Rcpp::wrap(result);
}





// [[Rcpp::export]]
SEXP summaryParC(NumericMatrix x,
                 int nrow, int ncol, const int ncores)
{
  NumericMatrix result(nrow, 5);
  int indices[6] = {-1, 0,  249,  500,  750, 999};

  #pragma omp parallel num_threads(ncores)
  {
    #pragma omp for schedule(dynamic)
      for(int i = 0; i < nrow; i++){
      {
        NumericVector v = (x.row(i));
        for(int q=0; q<5; q++){
          std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
          result(i,q) = *(v.begin() + indices[q+1]);
        }
      }
      }
  }
return Rcpp::wrap(result);
}





// [[Rcpp::export]]
SEXP summaryParCorder(NumericMatrix x,
                 int nrow, int ncol, const int ncores)
{
  NumericMatrix result(nrow, 5);
  int indices[6] = {-1, 0,  249,  500,  750, 999};

  #pragma omp parallel num_threads(ncores)
  {
    #pragma omp for ordered schedule(dynamic)
    for(int i = 0; i < nrow; i++){
      #pragma omp ordered
      {
        NumericVector v = (x.row(i));
        for(int q=0; q<5; q++){
          std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
          result(i,q) = *(v.begin() + indices[q+1]);
        }
      }
    }
  }
return Rcpp::wrap(result);
}




***** R - code *****
#this works, but summaryParCorder is much slower. 
mbm <- microbenchmark::microbenchmark(
  summaryC(x = matrix(as.numeric(1:1000000), ncol = 1000), 
           nrow = 1000, ncol = 1000, ncores = 4),

  summaryParCorder(x = matrix(as.numeric(1:1000000), ncol = 1000), 
              nrow = 1000, ncol = 1000, ncores = 4),
  times = 20
)
mbm

# this breaks:
summaryParC(x = matrix(as.numeric(1:1000000), ncol = 1000), 
                 nrow = 1000, ncol = 1000, ncores = 4)

1 Ответ

0 голосов
/ 25 августа 2018

Я не проверял совместимость с OpenMP, но Eigen::VectorXd::data() дает вам требуемый указатель, если рассматриваемый вектор не const:

// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>

// [[Rcpp::export]]
Eigen::VectorXd quantiles(Eigen::VectorXd x, const Eigen::VectorXi& indices) {
  Eigen::VectorXd result(indices.size());

  std::nth_element(x.data(), x.data() + indices[0], x.data() + x.size());
  result(0) = x[indices[0]];

  for (int i = 1; i < indices.size(); ++i) {
    std::nth_element(x.data() + indices[i - 1] + 1,
                     x.data() + indices[i],
                     x.data() + x.size());
    result(i) = x[indices[i]];
  }
  return result;
}

/*** R
set.seed(42)
x <- runif(12)
i <- sort(sample(seq_len(12), 3)) - 1
quantiles(x, i)
*/

Здесь полное решение, включая OpenMP:

// [[Rcpp::plugins(openmp)]]
// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>

using namespace Rcpp;

// [[Rcpp::export]]
NumericMatrix summaryC(NumericMatrix x, int nrow, int ncores)
{
  NumericMatrix result(nrow, 5);
  int indices[6] = {-1, 0,  249,  500,  750, 999};

  for (int i = 0; i < nrow; i++) {
    NumericVector v = (x.row(i));
    for (int q = 0; q < 5; ++q) {
      std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
      result(i,q) = *(v.begin() + indices[q+1]);
    }
  }
  return result;
}

// [[Rcpp::export]]
Eigen::MatrixXd summaryParC(Eigen::MatrixXd x,int nrow, int ncores) {
  Eigen::MatrixXd result(nrow, 5);
  int indices[6] = {-1, 0,  249,  500,  750, 999};

  #pragma omp parallel num_threads(ncores)
  {
    #pragma omp for schedule(dynamic)
      for (int i = 0; i < nrow; i++) {
        Eigen::VectorXd v = x.row(i);
        for (int q = 0; q < 5; ++q) {
          std::nth_element(v.data() + indices[q] + 1,
               v.data() + indices[q+1],
               v.data() + v.size());
          result(i,q) = v[indices[q+1]];
        }
      }
  }
  return result;
}

/*** R 
x <- matrix(as.numeric(1:1000000), ncol = 1000)
microbenchmark::microbenchmark(
   summaryC = summaryC(x = x, nrow = 1000, ncores = 4),
  summaryParC = summaryParC(x = x, nrow = 1000, ncores = 4),
  times = 100)
*/

Я никогда не видел сбоя с этой параллельной версией.А на моей двухъядерной машине он примерно на 44% быстрее, чем последовательный код.

...