У меня есть файл данных размером ~ 120 гигабайт, разделенный каналом (|), который я обрабатываю с использованием R. Все, что я делаю, - это чтение в файле кусками, удаление некоторых столбцов и запись данных обратно в отдельные файлы. , В конце моего кода я объединяю все разделенные файлы в один.
Я хотел ускорить свои операции, поэтому я распараллелил часть этого с помощью пакета snowfall. Однако этот код потребляет всю память на машине, и это не потому, что я запускаю слишком много процессов одновременно. Я нашел обходной путь, чтобы предотвратить использование памяти, но я не знаю, почему память использовалась для начала.
Для контекста, это на компьютере Windows Server 2016, с использованием Microsoft R Open 3.5.1.
Вот код, вызывающий утечку памяти:
require(data.table)
require(LaF)
require(dplyr)
require(snowfall)
output_file <- "output.txt"
input_file = "input.txt"
# this is determined via nrow(fread(input_file, select = 1L))
# it does use a lot of memory, but that's not the problem I'm asking about here
number_rows <- 46414461
batch_size <- 1e5
num_batches <- ceiling(number_rows/batch_size)
header <- colnames(fread(input_file, sep="|", header=T, nrows=1))
model <- detect_dm_csv(input_file, sep="|", header=TRUE)
drop_names <- c("a","b","c")
processFile <- function(k)
{
require(LaF)
require(data.table)
require(dplyr)
data.laf <- laf_open(model)
goto(data.laf, batch_size*(k-1)+1)
data <- next_block(data.laf, nrows=batch_size)
output <- data %>% select(-c(drop_names))
fwrite(output, paste0(output_file, k), sep= "|", col.names=T, quote=F)
rm(list=ls()) # these didn't help prevent the leak, sadly
gc()
}
# this where the workaround code shown at the end of the post would go
sfInit(parallel=T, cpus=4)
sfExportAll(except='processFile')
sfLapply(seq(num_batches), processFile)
sfStop()
for (i in seq(num_batches))
{
outputName <- paste0(output_file, i)
inputData <- fread(outputName, sep='|')
fwrite(inputData, file=output_file, append=ifelse(i==1, F, T), sep='|', col.names=ifelse(i==1, T, F), quote=F)
}
Когда этот код выполняется, 4 процесса R выполняют функцию processFile. Использование памяти между четырьмя процессами, как ни странно, не одинаково. Два из этих процессов начнут работать с размером около 60 МБ и в конечном итоге вырастут до 2 ГБ, один из процессов также начнется с размера 60 МБ и увеличится примерно до 4 ГБ. Но последний процесс - это реальная проблема: он продолжает расти и расти без остановки. Я ожидаю, что использование памяти будет сбрасываться между каждым фрагментом файла, но этого не происходит. После того, как записано около 60 фрагментов файла, один процесс потребляет почти все 120 гигабайт памяти в моей системе, и я должен остановить процесс (или операционная система убивает его для меня).
Но остальные 3 процесса остаются на 2-4 гигабайтах использования. Мне не имеет смысла, почему один из процессов потребляет так много памяти, когда все они должны делать одно и то же.
Я создал обходной путь, который позволил мне обработать весь мой файл без память растет за пределы. Я изменил часть кода, которая выполняет функции распараллеливания снегопада, чтобы работать в пакетах, как показано ниже.
num_cpus <- 4
clump_size <- 20
idx <- 1
while (T)
{
start <- clump_size * (idx - 1) + 1
end <- start + clump_size - 1
if (end > num_batches)
{
end <- num_batches
}
if (start > end)
{
break
}
if (length(start:end) >= num_cpus)
{
cpus <- num_cpus
} else
{
cpus <- length(start:end)
}
sfInit(parallel=T, cpus=cpus)
sfExportAll(except='processFile')
sfLapply(start:end, processFile)
sfStop()
idx <- idx + 1
}
Этот второй подход запускает 4 процесса, запускает их для нескольких файловых блоков и затем завершает эти 4 процесса, прежде чем любой из них сможет использовать слишком много памяти. Это повторяется до тех пор, пока не будет обработан весь файл. Используя этот подход, я могу фактически увеличить количество используемых процессоров до 12 (количество ядер на сервере), не занимая более 20% памяти на машине.
Так что же дает? Есть ли утечка памяти в моем коде или в самих базовых пакетах R /?