Скачайте файл с S3 и разделите по количеству строк - PullRequest
0 голосов
/ 03 марта 2020

Я пытаюсь загрузить файл из корзины S3 и разбить его на куски по 500000 строк, каждая из которых сохраняется как отдельный файл.

Код, который я написал ниже, продолжает падать с этой ошибкой:

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory
data-import-engine_1      | 
data-import-engine_1      | <--- Last few GCs --->
data-import-engine_1      | 
data-import-engine_1      | [298:0x5637f5f1d000]   629519 ms: Mark-sweep 1452.6 (1633.7) -> 1452.5 (1596.2) MB, 1100.7 / 0.0 ms  last resort GC in old space requested
data-import-engine_1      | [298:0x5637f5f1d000]   630627 ms: Mark-sweep 1452.5 (1596.2) -> 1452.5 (1591.7) MB, 1107.6 / 0.0 ms  last resort GC in old space requested
data-import-engine_1      | 
data-import-engine_1      | 
data-import-engine_1      | <--- JS stacktrace --->
data-import-engine_1      | 
data-import-engine_1      | ==== JS stack trace =========================================
data-import-engine_1      | 
data-import-engine_1      | Security context: 0xee0238255e9 <JSObject>
data-import-engine_1      |     1: _copyArray [/home/nodeusr/app/node_modules/denque/index.js:~395] [pc=0x33c25bba7a4b](this=0x3336cceeb8a1 <Denque map = 0x23eafa0916c1>,fullCopy=0x20ed2d882371 <true>)
data-import-engine_1      |     2: _growArray [/home/nodeusr/app/node_modules/denque/index.js:416] [bytecode=0x48d2d8f8429 offset=19](this=0x3336cceeb8a1 <Denque map = 0x23eafa0916c1>)
data-import-engine_1      |     3: /* anonymous */ [/home/nodeusr/app/node_modules/ioredis/built...

Я выполняю этот код в контейнере docker с alpine-node: 9 .n Я использую Bull Queue для обработки этих заданий в процессе песочницы (https://github.com/OptimalBits/bull#documentation)

Я уже пытался увеличить память для механизма docker и увеличил лимит памяти для процесса узла, но мне не повезло в решении этой проблемы.

const AWS = require('aws-sdk')
const fs = require('fs')
// var LineByLineReader = require('line-by-line')
const es = require('event-stream')

var s3 = new AWS.S3({
  region: process.env.DEFAULT_REGION || 'eu-west-2',
  accessKeyId: process.env.STORAGE_API_KEY || 'somekey',
  secretAccessKey: process.env.STORAGE_API_SECRET || 'somesecret',
  endpoint: (process.env.STORAGE_HOST && process.env.STORAGE_PORT) ? process.env.STORAGE_HOST + ':' + process.env.STORAGE_PORT : 'http://localstack:4572'
})

module.exports = (job) => {
  var dsConfig = job.data.dsConfig
  var totalBytes = 0

  var params = {
    Bucket: process.env.STORAGE_BUCKET_NAME || 'fcd_bucket',
    Key: dsConfig.resourceId
  }

  return new Promise((resolve, reject) => {
    s3.headObject(params, function (err, data) {
      if (err) reject(err)

      totalBytes = data.ContentLength
      var d = job.data
      if (typeof (job.data.progress) === 'undefined') {
        d.progress = 0
      }
      if (typeof (job.data.params) === 'undefined') {
        params.Range = 'bytes=0-' + totalBytes.toString()
        d.params = params
      }
      let progress = d.progress

      var chunkCounter = 0
      var totalCounter = 0
      var rowCounter = 0
      var chunkSize = 500000

      const filewriter = []
      filewriter[chunkCounter] = fs.createWriteStream('./tmp/' + d.procId + '.tmp', {
        flags: 'a', // 'a' means appending (old data will be preserved)
        encoding: 'utf8'
      })
      var outputFiles = ['./tmp/' + d.procId + '.tmp']

      function writeLine (line) {
        if (totalCounter > 0) filewriter[chunkCounter].write(line)
        if (rowCounter > chunkSize) {
          rowCounter = 0
          chunkCounter++
          filewriter[chunkCounter] = fs.createWriteStream('./tmp/' + d.procId + '-' + chunkCounter + '.tmp', {
            flags: 'a', // 'a' means appending (old data will be preserved)
            encoding: 'utf8'
          })
          outputFiles.push('./tmp/' + d.procId + '-' + chunkCounter + '.tmp')
        }
        rowCounter++
        totalCounter++
        progress += Buffer.byteLength(line, 'utf-8')
        d.params.Range = 'bytes=' + progress.toString() + '-' + totalBytes.toString()
        d.progress = progress
        job.progress(parseFloat(progress / totalBytes).toFixed(3) * 100)
        // pipeline.resume()
      }

      s3.getObject(params).createReadStream({
        encoding: 'utf8'
      }).pipe(es.split(/(\r?\n)/)).pipe(es.map((line, callback) => {
        callback(null, writeLine(line))
      }))
        .on('error', err => {
          job.update(d).then(() => {
            console.log('Error occurred during Job ' + job.id + ' execution, progress data stored so to restart from the same point')
          }).catch(err => {
            console.log(err)
          })
          reject(err)
        })
        .on('end', () => {
          filewriter.forEach(writer => {
            writer.end()
          })
          d.tempFiles = outputFiles
          job.update(d).then(() => {
            resolve()
          }).catch(err => {
            reject(err)
          })
        })
    })
  })
}

У вас есть предложение, как это исправить?

Спасибо, Salvo

1 Ответ

0 голосов
/ 07 марта 2020

Использование CloudFront с частичным запросом объектов:

Подробнее о: https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/RangeGETs.html

...