Я пытаюсь реализовать функцию в Rcpp, которая принимает матрицу в качестве входных данных и вычисляет и квантили, как указано пользователем для строки указанной матрицы. Поскольку я хочу использовать openMP, я попытался сделать это с помощью RcppEigen из-за проблем безопасности потоков.
Одна из причин, по которой это выглядит несколько сложным, заключается в том, что для эффективного расчета квантилей я попытался имитировать этот подход ( поиск квартилей , первый ответ), но учел ввод данных пользователем. По сути, я создаю вектор с индексами, соответствующими квантилям на первом шаге. На втором этапе я пытаюсь получить доступ к соответствующим значениям в цикле for.
Это код, который я пытался:
// // -*- mode: C++; c-indent-level: 4; c-basic-offset: 4; indent-tabs-mode: nil; -*-
// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>
// [[Rcpp::plugins(openmp)]]
#ifdef _OPENMP
#include <omp.h>
#endif
// [[Rcpp::plugins(cpp11)]]
#include <random>
// [[Rcpp::export]]
SEXP summaryParC(const Eigen::MatrixXd x,
const Eigen::VectorXd quantiles,
int nrow, int ncol, const int ncores)
{
const int no_quantiles = quantiles.size();
Eigen::MatrixXd result(nrow, no_quantiles);
// this part is just to give me a vector of indices I need later on in the foor loop
//-----------------------------------------------
Eigen::VectorXi indices(no_quantiles +1);
indices[0] = -1;
for (int k=0; k<no_quantiles; k++){
if (quantiles[k] < 0.5){
indices[k+1] = floor(quantiles[k] * (ncol-1));
} else {
indices[k+1] = ceil(quantiles[k] * (ncol-1));
}
}
//-----------------------------------------------
#pragma omp parallel num_threads(ncores)
{
#pragma omp for
for(int i = 0; i < nrow; i++){
// I am trying to convert it into a vector so I can sort it
Eigen::VectorXd v = (x.row(i));
auto * ptr = v; // this fails
// here I want to use the pointer to access the n-th element of the vector
for(int q=0; q<no_quantiles; q++){ //quantiles
std::nth_element(ptr + indices[q] + 1, ptr + indices[q+1], ptr + ncol);
result(i,q) = *(ptr + indices[q+1]);
}
}
}
return Rcpp::wrap(result);
}
Причина, по которой я хотел определить свой собственный указатель, заключается в том, что Eigen :: VectorXd v не имеет ничего общего с v.begin (). без openMP я бы просто определил x как NumericMatrix, а v как NumericVector, и все работает отлично. Используя openMP, я не могу полагаться на то, что он поточно-ориентированный?
Это работает для небольших наборов данных, но вылетает при использовании на большей матрице:
// [[Rcpp::export]]
SEXP summaryC(NumericMatrix x,
NumericVector quantiles,
int nrow, int ncol, const int ncores)
{
const int no_quantiles = quantiles.size();
NumericMatrix result(nrow, no_quantiles);
int indices[no_quantiles +1];
//-----------------------------------------------
indices[0] = -1;
for (int k=0; k<no_quantiles; k++){
if (quantiles[k] < 0.5){
indices[k+1] = floor(quantiles[k] * (ncol-1));
} else {
indices[k+1] = ceil(quantiles[k] * (ncol-1));
}
}
//-----------------------------------------------
#pragma omp parallel num_threads(ncores)
{
#pragma omp for
for(int i = 0; i < nrow; i++){
// converting it into a vector so I can sort it
NumericVector v = (x.row(i));
for(int q=0; q<no_quantiles; q++){ //quantiles
std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
result(i,q) = *(v.begin() + indices[q+1]);
}
}
}
return Rcpp::wrap(result);
}
Большое спасибо!
Обновление
Я реализовал подход Ральфа Стубнера. Указатель работает отлично, насколько я могу судить. (К сожалению, R все еще прерывает сеанс, когда я пытаюсь его запустить. Как отметил Дирк Эддельбюттель, использование указателя не решает проблему доступа к R-памяти).
// [[Rcpp::export]]
SEXP summaryParC(Eigen::MatrixXd x,
const Eigen::VectorXd quantiles,
int nrow, int ncol, const int ncores)
{
const int no_quantiles = quantiles.size();
Eigen::MatrixXd result(nrow, no_quantiles);
Eigen::VectorXi indices(no_quantiles +1);
indices[0] = -1;
for (int k=0; k<no_quantiles; k++){
if (quantiles[k] < 0.5){
indices[k+1] = floor(quantiles[k] * (ncol-1));
} else {
indices[k+1] = ceil(quantiles[k] * (ncol-1));
}
}
#pragma omp parallel num_threads(ncores)
{
#pragma omp for
for(int i = 0; i < nrow; i++){
Eigen::VectorXd v = (x.row(i));
double * B = v.data();
double * E = B + nrow;
for(int q=0; q<no_quantiles; q++){ //quantiles
std::nth_element(B + indices[q] + 1, B + indices[q+1], E);
result(i,q) = *(B + indices[q+1]);
}
}
}
return Rcpp::wrap(result);
}
2-е обновление : здесь более понятный пример основной проблемы. Я осознаю тот факт, что использование R-структур проблематично с openMP, но, возможно, этот пример может привести к лучшему пониманию основных причин.
// [[Rcpp::plugins(openmp)]]
// [[Rcpp::plugins(cpp11)]]
#include <Rcpp.h>
#ifdef _OPENMP
#include <omp.h>
#endif
using namespace Rcpp;
// [[Rcpp::export]]
SEXP summaryC(NumericMatrix x,
int nrow, int ncol, const int ncores)
{
NumericMatrix result(nrow, 5);
int indices[6] = {-1, 0, 249, 500, 750, 999};
// #pragma omp parallel num_threads(ncores)
{
// #pragma omp for
for(int i = 0; i < nrow; i++){
NumericVector v = (x.row(i));
for(int q=0; q < 5; q++){
std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
result(i,q) = *(v.begin() + indices[q+1]);
}
}
}
return Rcpp::wrap(result);
}
// [[Rcpp::export]]
SEXP summaryParC(NumericMatrix x,
int nrow, int ncol, const int ncores)
{
NumericMatrix result(nrow, 5);
int indices[6] = {-1, 0, 249, 500, 750, 999};
#pragma omp parallel num_threads(ncores)
{
#pragma omp for schedule(dynamic)
for(int i = 0; i < nrow; i++){
{
NumericVector v = (x.row(i));
for(int q=0; q<5; q++){
std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
result(i,q) = *(v.begin() + indices[q+1]);
}
}
}
}
return Rcpp::wrap(result);
}
// [[Rcpp::export]]
SEXP summaryParCorder(NumericMatrix x,
int nrow, int ncol, const int ncores)
{
NumericMatrix result(nrow, 5);
int indices[6] = {-1, 0, 249, 500, 750, 999};
#pragma omp parallel num_threads(ncores)
{
#pragma omp for ordered schedule(dynamic)
for(int i = 0; i < nrow; i++){
#pragma omp ordered
{
NumericVector v = (x.row(i));
for(int q=0; q<5; q++){
std::nth_element(v.begin() + indices[q] + 1, v.begin() + indices[q+1], v.end());
result(i,q) = *(v.begin() + indices[q+1]);
}
}
}
}
return Rcpp::wrap(result);
}
***** R - code *****
#this works, but summaryParCorder is much slower.
mbm <- microbenchmark::microbenchmark(
summaryC(x = matrix(as.numeric(1:1000000), ncol = 1000),
nrow = 1000, ncol = 1000, ncores = 4),
summaryParCorder(x = matrix(as.numeric(1:1000000), ncol = 1000),
nrow = 1000, ncol = 1000, ncores = 4),
times = 20
)
mbm
# this breaks:
summaryParC(x = matrix(as.numeric(1:1000000), ncol = 1000),
nrow = 1000, ncol = 1000, ncores = 4)