Добавление CSV в параллельном цикле без ошибок - PullRequest
1 голос
/ 10 апреля 2019

Мне нужно добавить CSV с помощью параллельного цикла, и мне было интересно, есть ли возможность сделать это без ошибок.

В основном мне нужно обработать много данных, и выЯ не могу поместить все это в память, поэтому мне нужно добавить результаты.В цикле lapply это будет длиться вечно, поэтому я использую пакет pbapply.Но при добавлении файлов, так как часто два ядра будут добавляться одновременно, это портит конфигурацию csv.

Я предполагаю, что есть какой-то способ заблокировать соединение с файлом, пока какой-то кластер обрабатывает его, и просто заставить другие кластеры немного подождать, когда это соединение будет закрыто, чтобы повторить попытку, но я не смогнайдите способ сделать это.

Вот пример типа ошибки, которую я получаю:

library(parallel)
library(pbapply)
library(data.table)

write_random_thing <- function(x){
  require(data.table)

  y <- data.table(A = x, B = round(rnorm(10)*100,2))

  pth <- 'example.csv'
  fwrite(y, pth, append = TRUE)

  y
}

cl <- makeCluster(4)
xx <- pblapply(1:20, cl = cl, FUN = write_random_thing)
stopCluster(cl = cl)

yy <- rbindlist(xx)

zz <- fread('example.csv') # this will usually return an error

В этом случае yy и zz должны быть одинаковыми(даже в другом порядке), но часто файл не может быть прочитан, потому что число столбцов не является постоянным.

Я искал какое-нибудь решение, в котором, если файл заблокирован, когда вы пытаетесь записатьэто, он спит в течение нескольких секунд и попробуйте снова.Существует ли что-то подобное?

Ответы [ 2 ]

0 голосов
/ 10 апреля 2019

Если вам нужно что-то писать параллельно, вам нужны блокировки, чтобы убедиться, что два процесса не пишут одновременно.

Это легко сделать в R с пакетом {flock}:

library(parallel)
library(pbapply)
library(data.table)

write_random_thing <- function(x){
  require(data.table)

  y <- data.table(A = x, B = round(rnorm(10)*100,2))

  pth <- 'example.csv'
  lock <- flock::lock(pth)
  fwrite(y, pth, append = TRUE)
  flock::unlock(lock)

  y
}

cl <- makeCluster(4)
xx <- pblapply(1:20, cl = cl, FUN = write_random_thing)
stopCluster(cl = cl)

yy <- rbindlist(xx)

zz <- fread('example.csv') # this will usually return an error
0 голосов
/ 10 апреля 2019

Я бы сделал что-то вроде этого, чтобы добавить файл параллельно-

require(doParallel)
require(doRNG)

ncores <- 7
cl <- makeCluster( ncores , outfile = "" )
registerDoParallel( cl )

res <- foreach( j = 1:100 , .verbose = TRUE , .inorder= FALSE ) %dorng%{
    d <- matrix( rnorm( 1e3 , j ) , nrow = 1 )
    conn <- file( sprintf("~/output_%d.txt" , Sys.getpid()) , open = "a" )
    write.table( d , conn , append = TRUE , col.names = FALSE )
    close( conn )
}
...