Попытка выполнить операторы R в многоядерной системе путем извлечения данных из БД и передачи параметров из другой таблицы в l oop
library(doParallel)
detectCores()
cl <- makeCluster(10)
registerDoParallel(cl)
getDoParWorkers()
clusterEvalQ(cl,library(RPostgreSQL))
clusterEvalQ(cl,library(DBI))
clusterEvalQ(cl,library(reshape2))
clusterEvalQ(cl,library(dplyr))
DF для передачи / фильтрации данных из БД
Col1 = c("US","US","US")
Col2 = c("FL","WC","NY")
Col3 = c("2019-02-20","2019-02-20","2019-02-20")
Col4 = c("2019-02-25","2019-02-25","2019-02-25")
add_tab = data.frame(Col1,Col2,Col3,Col4)
add_tab$Col3 =as.Date(add_tab$Col3)
add_tab$Col4 =as.Date(add_tab$Col4)
Функция для извлечения данных из БД
update <- function(i) {
drv <- dbDriver("PostgreSQL")
con <- dbConnect(drv, dbname = "db_name",host = "10.1.0.0" , port = "5433", user = "postgres", password = "postgres@123")
query = paste0("select * from my_schema.\"sales_tab\" where \"Country\" = '" ,add_tab[i,1],"' and \"State\" = '",
add_tab[i,2],"' and \"Date\">='" ,format(add_tab[i,4],"%Y-%m-%d"),"' and \"Date\"<='" ,format(add_tab[i,5],"%Y-%m-%d"),"' ")
df= dbGetQuery(con, query)
}
После извлечения операторов в foreach l oop
foreach(i = 1:nrow(add_tab),.inorder=FALSE,.packages="RPostgreSQL")%dopar%{
update(i)
df = df %>% filter(sale_type == 'online')
df= dcast(df,Country+State+city~sales_amt,fun.aggregate = sum,value.var='value')
dbWriteTable(con, c("my_schema","online_sales"), value = df, append = TRUE, row.names = FALSE))
dbDisconnect(con)
}
clusterEvalQ(cl, {
dbDisconnect(con)
})
stopCluster(cl)
Ошибка / результат, который я получаю через некоторое время "NULL".
Заранее спасибо