foreach% dopar% + RPostgreSQL - PullRequest
       9

foreach% dopar% + RPostgreSQL

7 голосов
/ 11 октября 2010

Я использую RPostgreSQL для подключения к локальной базе данных. Настройка отлично работает на моей машине с Linux. R 2.11.1, Postgres 8.4.

Я играл с 'foreach' с многоядерным (doMC) параллельным бэкэндом, чтобы обернуть некоторые повторяющиеся запросы (насчитывающие несколько тысяч) и добавить результаты в структуру данных. Любопытно, что он работает, если я использую% do%, но не работает, когда я переключаюсь на% dopar%, за исключением того, что есть только одна итерация (как показано ниже)

Мне было интересно, связано ли это с одним объектом подключения, поэтому я создал 10 объектов подключения, и в зависимости от того, что было «i», для этого запроса был задан определенный объект con, в зависимости от i по модулю 10. ниже всего 2 объекта подключения). Выражение, которое оценивается как eval (expr.01), содержит / является запросом, который зависит от того, что такое «i».

Я не могу понять эти конкретные сообщения об ошибках. Мне интересно, есть ли способ сделать эту работу.

Спасибо.
Вишал Белсаре

Фрагмент R следует:

> id.qed2.foreach <- foreach(i = 1588:1588, .inorder=FALSE) %dopar% { 
+ if (i %% 2 == 0) {con <- con0}; 
+ if (i %% 2 == 1) {con <- con1}; 
+ fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters};
> id.qed2.foreach
[[1]]
  [1]   411   414  2140  2406  4490  4507  4519  4570  4571  4572  4703  4731
[109] 48765 84312 91797

> id.qed2.foreach <- foreach(i = 1588:1589, .inorder=FALSE) %dopar% { 
+ if (i %% 2 == 0) {con <- con0}; 
+ if (i %% 2 == 1) {con <- con1}; 
+ fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters};
Error in stop(paste("expired", class(con))) : 
  no function to return from, jumping to top level
Error in stop(paste("expired", class(con))) : 
  no function to return from, jumping to top level
Error in { : 
  task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'"
> 

РЕДАКТИРОВАТЬ: Я изменил несколько вещей (все еще безуспешно), но некоторые вещи обнаруживаются. Объекты соединений, выполненные в цикле и не «отключенные» через dbDisconnect, приводят к зависанию соединений, что видно из / var / log для Postgres. Несколько новых сообщений об ошибках появляются, когда я делаю это:

> system.time(
+ id.qed2.foreach <- foreach(i = 1588:1590, .inorder=FALSE, 
.packages=c("DBI", "RPostgreSQL")) %dopar% {drv0 <- dbDriver("PostgreSQL"); 
con0 <- dbConnect(drv0, dbname='nseindia');
list(idreuters=fetch(dbSendQuery(con0,eval(expr.01)),n=-1)$idreuters);
dbDisconnect(con0)})
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in { : 
  task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'"

Ответы [ 2 ]

16 голосов
/ 08 июля 2014

Более эффективно создавать соединение с базой данных один раз на одного работника, а не один раз на задачу.К сожалению, mclapply не предоставляет механизм для инициализации рабочих перед выполнением задач, поэтому это нелегко сделать с помощью бэкэнда doMC, но если вы используете бэкэнд doParallel, вы можете инициализировать рабочих с помощью clusterEvalQ.Вот пример того, как реструктурировать код:

library(doParallel)
cl <- makePSOCKcluster(detectCores())
registerDoParallel(cl)

clusterEvalQ(cl, {
  library(DBI)
  library(RPostgreSQL)
  drv <- dbDriver("PostgreSQL")
  con <- dbConnect(drv, dbname="nsdq")
  NULL
})

id.qed.foreach <- foreach(i=1588:3638, .inorder=FALSE,
                          .noexport="con",
                          .packages=c("DBI", "RPostgreSQL")) %dopar% {
  lst <- eval(expr.01)  #contains the SQL query which depends on 'i'
  qry <- dbSendQuery(con, lst)
  tmp <- fetch(qry, n=-1)
  dt <- dates.qed2[i]
  list(date=dt, idreuters=tmp$idreuters)
}

clusterEvalQ(cl, {
  dbDisconnect(con)
})

Поскольку doParallel и clusterEvalQ используют один и тот же объект кластера cl, цикл foreach будет иметь доступ к объекту соединения с базой данных con при выполнениизадачи.

2 голосов
/ 11 октября 2010

Следующее работает и ускоряется в ~ 1,5 раза по последовательной форме. В качестве следующего шага мне интересно, возможно ли прикрепить объект подключения к каждому из рабочих, созданных registerDoMC. Если это так, то не было бы необходимости создавать / уничтожать объекты соединений, что предотвращает перегрузку сервера PostgreSQL соединениями.

pgparquery <- function(i) {
drv <- dbDriver("PostgreSQL"); 
con <- dbConnect(drv, dbname='nsdq'); 
lst <- eval(expr.01); #contains the SQL query which depends on 'i'
qry <- dbSendQuery(con,lst);
tmp <- fetch(qry,n=-1);
dt <- dates.qed2[i]
dbDisconnect(con);
result <- list(date=dt, idreuters=tmp$idreuters)
return(result)}

id.qed.foreach <- foreach(i = 1588:3638, .inorder=FALSE, .packages=c("DBI", "RPostgreSQL")) %dopar% {pgparquery(i)}

-
Вишал Белсаре

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...