Как правильно использовать RcppThread для распараллеливания цикла for - PullRequest
0 голосов
/ 04 августа 2020

Я пытаюсь распараллелить a для l oop с помощью RcppThread. Непараллельная версия выглядит так:

IntegerVector simulate_pos(NumericVector x_pop,
                           NumericVector y_pop,
                           int n_studies,
                           int sample_size_min,
                           int sample_size_max,
                           bool replace,
                           float lower_limit,
                           float upper_limit){
  IntegerVector pos(n_studies);
  int npop = x_pop.size();
  NumericVector index_pop(npop);
  for (int i = 0; i < npop; i++){
    index_pop[i] = i;
  }

  // HERE IS THE LOOP-------------------------------------------------
  for (int k = 0; k < n_studies; k++){
    pos[k] = simulate_one_pos(x_pop, y_pop, index_pop, sample_size_min,
                              sample_size_max, replace, lower_limit,
                              upper_limit);
  }
  // ------------------------------------------------------------------

  return(pos);
}

Теперь я подумал, что не составит труда использовать parallelFor:

std::vector<int> simulate_pos(NumericVector x_pop,
                              NumericVector y_pop,
                              int n_studies,
                              int sample_size_min,
                              int sample_size_max,
                              bool replace,
                              float lower_limit,
                              float upper_limit,
                              int n_threads){
  std::vector<int> pos(n_studies);
  int npop = x_pop.size();
  NumericVector index_pop(npop);
  for (int i = 0; i < npop; i++){
    index_pop[i] = i;
  }
  
  // HERE IS THE LOOP-------------------------------------------------
  RcppThread::parallelFor(0, pos.size(), [&] (int i){
    pos[i] = simulate_one_pos(x_pop, y_pop, index_pop, sample_size_min,
                              sample_size_max, replace, lower_limit,
                              upper_limit);
  });
  // -----------------------------------------------------------------

  return(pos);
}

Чтобы придерживаться RcppThread-paper (https://arxiv.org/pdf/1811.00450.pdf) Я использовал std::vector в качестве возвращаемого значения вместо R cpp эквивалента IntegerVector.

Иногда код работает, иногда возникает ошибка дисбаланса стека, иногда просто зависает . Я предполагаю, что делаю большую концептуальную ошибку и должен отметить, что я в значительной степени новичок в C ++.

Проблема в том, что несколько потоков одновременно читают один и тот же адрес памяти? Или структуры данных R cpp (например, NumericVector) вызывают проблемы?

Полный код можно найти на github: https://github.com/johannes-titz/fastpos/tree/rcppthread

Для запуска самостоятельно :

devtools::install_github("johannes-titz/fastpos@rcppthread")

pop <- fastpos::create_pop(0.5, 1e5)
x <- pop[,1]
y <- pop[,2]
lower_limit <- 0.4
upper_limit <- 0.6
n_studies <- 50
sample_size_min <- 20
sample_size_max <- 1000

res <- fastpos::simulate_pos(x, y, n_studies, sample_size_min, sample_size_max, TRUE, lower_limit,
                             upper_limit, 4)

PS: также пытался использовать pool.pushReturn, но с тем же результатом.

EDIT: Проблема действительно заключалась в использовании структур данных R cpp (NumericVector) . При замене всех на std::vector работает нормально. Теперь, без сахара R cpp, мне пришлось найти способ взять сэмпл из std::vector (внутри функции, которую я вызываю в l oop), но это явно того стоит.

1 Ответ

2 голосов
/ 04 августа 2020

Вы пишете

Проблема в том, что несколько потоков одновременно читают один и тот же адрес памяти? Или структуры данных R cpp (например, NumericVector) вызывают проблемы?

, и я склонен сказать «вероятно». См. Отличную документацию в пакете RcppParallel, а также то, как он строится из небольших примеров. медленно добавляя и убеждаясь, что когда вы переходите мост добавления любой R-доступной или созданной памяти, все по-прежнему работает, как задумано.

К сожалению, мы не можем добавить параллельный внешний l oop вокруг внутреннего кода R и «надежда на лучшее». OpenMP и его друзья более требовательны, а однопоточный характер R налагает больше ограничений.

(И вы, вероятно, знаете, что можно, конечно, запустить параллелизм более высокого уровня из функций R поверх R, но это другое дело c и подход.)

...