RcppParallel изменения результатов с несколькими потоками - PullRequest
0 голосов
/ 14 мая 2019

Я новичок в Rcpp и RcppParallel.Я пытаюсь использовать RcppParallel для оптимизации моего R-кода, и сейчас я делаю несколько игрушечных кодов для их изучения.Теперь я сделал код RcppParallel, и результат отличается от того, что я думал.Результат меняется всякий раз, когда я пытаюсь использовать функцию.

Вот мой код

library(Rcpp)
library(RcppParallel)
library(RcppArmadillo)
library(data.table)
library(pryr)

#key<-rep(rep(c("a","b"),each=12500000))
grp<-rep(rep(c("a","b","c","d","e"),each=2500000),2)
val<-rnorm(25000000,0,8)
dat<-setDT(data.frame(grp=grp,val=val))
#raw<-setDT(data.frame(key=key,grp=grp,val=val))
na.omit(dat)
#setkey(dat,"key","grp")
setkey(dat,"grp")
#key<-as.factor(key)
grp<-as.factor(grp)
gc()

sourceCpp("test.cpp")

set.seed(1)
dist<-do.call(rbind,tapply(1:NROW(dat),as.factor(dat$grp),function(x) stats::rmultinom(1,NROW(x),rep(1/NROW(x),NROW(x)))))  

setThreadOptions(numThreads=4)
for(i in 1:10) print(test(dat,table(dat$grp),dist))
setThreadOptions(numThreads=1)
for(i in 1:10) print(test(dat,table(dat$grp),dist))

Вот код Rcpp

#include <Rcpp.h>
#include <RcppParallel.h>

using namespace Rcpp;
using namespace RcppParallel;

// [[Rcpp::depends(RcppParallel)]]

struct INDSUM : public Worker
{
  const RVector<double> val;
  const RVector<int> dist;
  RVector<double> output;

  INDSUM(const NumericVector &_val, const IntegerVector &_dist, NumericVector &_output)
    : val(_val),dist(_dist),output(_output) {}

  void operator()(size_t begin, size_t end)
  {
    for(size_t i=begin; i< end; i++)
    {
      output[0] += val[i]*dist[i];
      output[1] += val[i]*val[i]*dist[i];
    }
  }
};

// [[Rcpp::export]]
NumericMatrix test(DataFrame &df, NumericVector &grptable, IntegerVector &dist) {

  IntegerVector idx = df[0];
  NumericVector val = df[1];
  size_t grpnum=grptable.length();

  NumericMatrix output(grpnum,2);
  NumericVector tmp(2);
  NumericVector sum(grpnum);
  NumericVector sumsq(grpnum);

  INDSUM indexsum(val, dist, tmp);

  for(size_t j=0, cnt=0 ; j<grpnum; j++)
  {
    parallelFor(cnt,cnt+grptable[j],indexsum);
    sum[j] = tmp[0];
    sumsq[j] = tmp[1];
    tmp[0]=tmp[1]=0;
    cnt+=grptable[j];

    output(j,0)=sum[j];
    output(j,1)=sumsq[j];
  }
  return output;
}

Когда число потоков равно 4,результат выглядит следующим образом:

[1,]  -1328.307 17484320
[2,]  -8175.153 96214984
[3,] -17317.002 80623284
[4,]  -7964.977 83306470
[5,]  16800.532 82033877
           [,1]     [,2]
[1,]  -6886.349 79063639
[2,] -19529.382 80570964
[3,] -21653.201 80363894
[4,]  -3256.842 81909243
[5,]   2266.153 79906235
           [,1]     [,2]
[1,] -12306.965 80778175
[2,]   2490.576 80411474
[3,]  -6631.620 80495938
[4,]   1477.019 52269743
[5,]  -1167.497 92402507
           [,1]     [,2]
[1,] -15329.571 81025309
[2,] -10860.718 76984730
[3,]   2430.499 96612706
[4,]  17321.521 97019020
[5,]   6702.637 81961180
           [,1]     [,2]
[1,] -20691.132 80094518
[2,]  -7922.633 80567608
[3,]  -6579.185 83056898
[4,]  24761.582 80163577
[5,]  -8022.315 80314909
           [,1]     [,2]
[1,] -18693.011 80439056
[2,]  -9705.923 80580004
[3,] -20444.317 89155340
[4,]  13285.827 80487654
[5,]  14155.066 84664113
            [,1]      [,2]
[1,] -17247.0503  80775485
[2,] -15141.5544 107365503
[3,]    315.3996  91628341
[4,]   4731.3399  81038215
[5,]   8374.9257  77583065
            [,1]     [,2]
[1,]   -973.0294 81194199
[2,] -13473.3673 79293218
[3,] -11028.3987 80865948
[4,]   1126.7350 80539346
[5,]  12452.9127 79909947
           [,1]     [,2]
[1,] -18049.085 80850260
[2,] -11117.905 81232899
[3,] -12195.323 80512124
[4,]  -9120.614 80557568
[5,]   6800.466 76608380
           [,1]     [,2]
[1,] -33611.785 76090786
[2,] -10439.372 85329662
[3,]  -1847.212 92939924
[4,]  -3906.611 86959278
[5,]  -5839.027 85420969

В то время как когда я использую один поток, я получаю результат:

           [,1]      [,2]
[1,] -59155.958 319664972
[2,] -26054.697 320363256
[3,] -40476.176 320388784
[4,]   6573.581 320427866
[5,]  30657.222 319834396

Спасибо вам.

1 Ответ

2 голосов
/ 14 мая 2019

В нашем коде есть условие гонки (в данном случае также называемое гонкой данных), поскольку все потоки записывают в один и тот же вектор двух элементов tmp. Однако для увеличения значения требуется три шага:

  1. чтение значения
  2. увеличение прочитанного значения
  3. запись увеличенного значения

Если все потоки делают это параллельно без блокировки, один поток будет выполнять обратную запись между чтением и записью другого потока, отбрасывая приращение, выполненное первым потоком.

Одним из решений было бы использование выходных переменных для каждого потока, которые собираются после символов в последовательном коде. Также может иметь смысл заглянуть в RcppParallel::parallelReduce, поскольку вы пытаетесь выполнить операцию редуцирования.

...