Применить функцию к нескольким группам, используя функции Rcpp и R - PullRequest
0 голосов
/ 17 ноября 2018

Я пытаюсь применить функцию к нескольким группам / идентификаторам в r, используя пакет foreach.Работа с параллельной обработкой через %dopar% занимает целую вечность, поэтому мне было интересно, возможно ли запустить apply или часть цикла в c++ через rcpp или другие пакеты, чтобы сделать его быстрее.Я не знаком с c++ или другими пакетами, которые могут сделать это, поэтому я надеюсь узнать, возможно ли это.Пример кода приведен ниже.Моя действительная функция длиннее с более чем 20 входами и занимает даже больше времени, чем то, что я публикую

Я ценю помощь.

РЕДАКТИРОВАТЬ: Я понял, что мой начальныйВопрос был неопределенным, поэтому я постараюсь сделать лучше.У меня есть таблица с данными временных рядов по группам.Каждая группа имеет> 10K строк.Я написал в c++ через rcpp функцию, которая фильтрует таблицу по группам и применяет функцию.Я хотел бы пройтись по уникальным группам и объединить результаты, как rbind, используя rcpp, чтобы он работал быстрее.См. Пример кода ниже (моя реальная функция длиннее)

library(data.table)
library(inline)
library(Rcpp)
library(stringi)
library(Runuran)

# Fake data
DT <- data.table(Group = rep(do.call(paste0, Map(stri_rand_strings, n=10, length=c(5, 4, 1),
                                                   pattern = c('[A-Z]', '[0-9]', '[A-Z]'))), 180))

df <- DT[order(Group)][
  , .(Month = seq(1, 180, 1),
      Col1 = urnorm(180, mean = 500, sd = 1, lb = 5, ub = 1000), 
      Col2 = urnorm(180, mean = 1000, sd = 1, lb = 5, ub = 1000), 
      Col3 = urnorm(180, mean = 300, sd = 1, lb = 5, ub = 1000)), 
  by = Group
  ]

# Rcpp function
#include <Rcpp.h>
using namespace Rcpp;
// [[Rcpp::plugins(cpp11)]]

// [[Rcpp::export]]
DataFrame testFunc(DataFrame df, StringVector ids, double var1, double var2) {

  // Filter by group
  using namespace std;  
  StringVector sub = df["Group"];
  std::string level = Rcpp::as<std::string>(ids[0]);
  Rcpp::LogicalVector ind(sub.size());
  for (int i = 0; i < sub.size(); i++){
    ind[i] = (sub[i] == level);
  }

  // Access the columns
  CharacterVector Group = df["Group"];
  DoubleVector Month = df["Month"];
  DoubleVector Col1 = df["Col1"];
  DoubleVector Col2 = df["Col2"];
  DoubleVector Col3 = df["Col3"];


  // Create calculations
  DoubleVector Cola = Col1 * (var1 * var2);
  DoubleVector Colb = Col2 * (var1 * var2);
  DoubleVector Colc = Col3 * (var1 * var2);
  DoubleVector Cold = (Cola + Colb + Colc);

  // Result summary
  std::string Group_ID = level;
  double SumCol1 = sum(Col1);
  double SumCol2 = sum(Col2);
  double SumCol3 = sum(Col3);
  double SumColAll = sum(Cold);

  // return a new data frame
  return DataFrame::create(_["Group_ID"]= Group_ID, _["SumCol1"]= SumCol1,
                            _["SumCol2"]= SumCol2, _["SumCol3"]= SumCol3, _["SumColAll"]= SumColAll);
}

# Test function
Rcpp::sourceCpp('sample.cpp')
testFunc(df, ids = "BFTHU1315C", var1 = 24, var2 = 76) # ideally I would like to loop through all groups (unique(df$Group))

#     Group_ID  SumCol1 SumCol2  SumCol3  SumColAll
# 1 BFTHU1315C 899994.6 1798561 540001.6 5907129174

Заранее спасибо.

1 Ответ

0 голосов
/ 19 ноября 2018

Я бы предложил переосмыслить наш подход.Ваш тестовый набор данных, который, как я полагаю, сопоставим с вашим реальным набором данных, имеет 3e8 строк.Я оцениваю около 10 ГБ данных.С этими данными вы, похоже, делаете следующее:

  • Определите список уникальных идентификаторов (около 5e5)
  • Создайте одну задачу для каждого уникального идентификатора
  • Каждый из этихtasks получает полный набор данных и отфильтровывает все данные, которые не относятся к рассматриваемому идентификатору
  • Каждая из этих задач добавляет дополнительные столбцы, которые не зависят от идентификатора
  • Каждый изtasks выполняет group_b(ID), но в наборе данных остается только один идентификатор
  • Каждая из задач рассчитывает несколько простых средств

Мне это кажется очень неэффективным в отношении использования памяти,Вообще говоря, для таких задач вы бы хотели «параллелизм разделяемой памяти», но foreach дает вам только «параллелизм процесса».Недостатком параллелизма процессов является то, что он увеличивает стоимость памяти.

Кроме того, вы отбрасываете весь код группировки и агрегирования, который существует в базе данных R / dplyr / data.table / SQL engine / / ...Маловероятно, что вы или кто-либо другой, кто читает ваш вопрос здесь, сможет улучшить эти существующие кодовые базы.

Мои предложения:

  • Забудьте о «параллелизме процесса» (длясейчас)
  • Если у вас достаточно ОЗУ, попробуйте использовать простой dplyr канал с mutate / group_by / summarize.
  • Если это не достаточно быстро, узнайте, какагрегация работает с data.table, который, как известно, работает быстрее и предлагает «параллелизм совместно используемой памяти» через OpenMP.
  • Если на вашем компьютере недостаточно памяти и происходит перестановка, найдите возможности выхода извычисление памяти.Лично я бы использовал (встроенную) базу данных.

Чтобы сделать это более явным.Вот единственное решение data.table:

library(data.table)
library(stringi)

# Fake data
set.seed(42)
var1 <- 24
var2 <- 76

DT <- data.table(Group = rep(do.call(paste0, Map(stri_rand_strings, n=10, length=c(5, 4, 1),
                                                 pattern = c('[A-Z]', '[0-9]', '[A-Z]'))), 180))
setkey(df, Group)

df <- DT[order(Group)][
  , .(Month = seq(1, 180, 1),
      Col1 = rnorm(180, mean = 500, sd = 1), 
      Col2 = rnorm(180, mean = 1000, sd = 1), 
      Col3 = rnorm(180, mean = 300, sd = 1)), 
  by = Group
  ][, c("Cola", "Colb", "Colc") := .(Col1 * (var1 * var2), 
                                     Col2 * (var1 * var2),
                                     Col3 * (var1 * var2))
    ][, Cold := Cola + Colb + Colc]


# aggregagation
df[, .(SumCol1 = sum(Col1),
       SumCol2 = sum(Col2),
       SumCol3 = sum(Col3),
       SumColAll = sum(Cold)), by = Group]

Я добавляю вычисленные столбцы по ссылке.На этапе агрегирования используются функции группировки, предоставляемые data.table.Если ваша агрегация более сложная, вы также можете использовать функцию:

# aggregation function
mySum <- function(Col1, Col2, Col3, Cold) {
  list(SumCol1 = sum(Col1),
       SumCol2 = sum(Col2),
       SumCol3 = sum(Col3),
       SumColAll = sum(Cold))
}

df[, mySum(Col1, Col2, Col3, Cold), by = Group]

И если агрегация может быть быстрее при использовании C ++ (не в таких случаях, как sum!), Вы можете дажеиспользуйте это:

# aggregation function in C++
Rcpp::cppFunction('
Rcpp::List mySum(Rcpp::NumericVector Col1, 
                 Rcpp::NumericVector Col2, 
                 Rcpp::NumericVector Col3, 
                 Rcpp::NumericVector Cold) {
    double SumCol1 = Rcpp::sum(Col1);
    double SumCol2 = Rcpp::sum(Col2);
    double SumCol3 = Rcpp::sum(Col3);
    double SumColAll = Rcpp::sum(Cold);             
    return Rcpp::List::create(Rcpp::Named("SumCol1") = SumCol1,
                              Rcpp::Named("SumCol2") = SumCol2,
                              Rcpp::Named("SumCol3") = SumCol3,
                              Rcpp::Named("SumColAll") = SumColAll);
}
')

df[, mySum(Col1, Col2, Col3, Cold), by = Group]

Во всех этих примерах нащупывание и зацикливание оставлено на data.table, так как вы ничего не получите, делая это самостоятельно.

...