Я пишу приложение nodeJS, которое загружает большие файлы CSV (> 1 млн строк) из корзины S3, разбивает их на фрагменты и сохраняет фрагменты в файлы CSV меньшего размера для дальнейшей обработки.
My приложение запускается в контейнер docker, и у меня возникают проблемы с памятью.
Это ошибка, которую я получаю.
Job error occurred! Error: write ENOBUFS
at afterWriteDispatched (internal/stream_base_commons.js:156:25)
at writevGeneric (internal/stream_base_commons.js:139:3)
at Socket._writeGeneric (net.js:783:11)
at Socket._writev (net.js:792:8)
at doWrite (_stream_writable.js:375:12)
at clearBuffer (_stream_writable.js:509:5)
at onwrite (_stream_writable.js:418:7)
at WriteWrap.onWriteComplete [as oncomplete] (internal/stream_base_commons.js:103:10) {
errno: -105,
code: 'ENOBUFS',
syscall: 'write'
}
Вот мой код:
const readline = require('readline')
module.exports = (job) => {
var dsConfig = job.data.dsConfig
var totalBytes = 0
var procId = job.data.procId
var params = {
Bucket: process.env.STORAGE_BUCKET_NAME || 'xx_bucket',
Key: dsConfig.resourceId
}
return new Promise((resolve, reject) => {
totalBytes = fs.statSync('./tmp/' + procId + '-preload.tmp').size
var d = job.data
if (typeof (job.data.progress) === 'undefined') {
d.progress = 0
}
if (typeof (job.data.params) === 'undefined') {
params.Range = 'bytes=0-' + totalBytes.toString()
d.params = params
}
let progress = d.progress
var chunkCounter = 0
var totalCounter = 0
var rowCounter = 0
var chunkSize = 500000
const filewriters = []
filewriters[chunkCounter] = fs.createWriteStream('./tmp/' + d.procId + '.tmp', {
flags: 'a', // 'a' means appending (old data will be preserved)
encoding: 'utf8'
})
var outputFiles = ['./tmp/' + d.procId + '.tmp']
function writeLine (line) {
return new Promise((resolve, reject) => {
try {
if (totalCounter > 0) {
filewriters[chunkCounter].write(line + '\n')
}
if (rowCounter > chunkSize) {
rowCounter = 0
chunkCounter++
filewriters[chunkCounter] = fs.createWriteStream('./tmp/' + d.procId + '-' + chunkCounter + '.tmp', {
flags: 'a',
encoding: 'utf8'
})
outputFiles.push('./tmp/' + d.procId + '-' + chunkCounter + '.tmp')
}
rowCounter++
totalCounter++
progress += Buffer.byteLength(line + '\n', 'utf-8')
d.params.Range = 'bytes=' + progress.toString() + '-' + totalBytes.toString()
d.progress = progress
job.progress(parseFloat(progress / totalBytes).toFixed(3) * 100)
job.update(d)
resolve()
} catch (error) {
reject(error)
}
})
}
try {
var lr = readline.createInterface({
input: fs.createReadStream('./tmp/' + procId + '-preload.tmp'),
crlfDelay: Infinity
})
lr.on('line', async function (line) {
// pause emitting of lines...
lr.pause()
await writeLine(line)
lr.resume()
})
lr.on('close', function () {
// All lines are read, file is closed now.
filewriters.forEach(writer => {
writer.end()
})
setTimeout(function () {
fs.unlink('./tmp/' + procId + '-preload.tmp')
}, 60000)
d.tempFiles = outputFiles.reverse()
job.update(d).then(() => {
resolve()
}).catch(err => {
reject(err)
})
})
} catch (err) {
job.update(d).then(() => {
reject(err)
}).catch(err => {
console.log('Error updating job data', err)
reject(err)
})
}
})
}
Я запускаю это в контейнер docker с mhart / alpine-node: 14 и docker -compose. Я уже пытался увеличить лимит памяти, добавив:
- NODE_OPTIONS = - max-old-space-size = 8192
к моему docker - составить файл.
Я почти уверен, что что-то не так в том, как я пишу строки в файлах, но я не совсем понял, в чем проблема. Есть ли у кого-нибудь предложения?
Спасибо, Salvo