Nodejs | распакуйте и прочитайте только верхние n строк файлов - PullRequest
1 голос
/ 14 марта 2020

У меня есть почтовый индекс в s3, который имеет сотни CSV-файлов. Я пытаюсь выполнить потоковую передачу файлов, и мне нужно прочитать первые n строк файла. Я могу разархивировать его и прочитать содержимое, но не уверен, как остановить поток, когда я закончу чтение n строк и продолжу работу с остальными файлами.

Код уже пробовал

const aws = require("aws-sdk");
const s3 = new aws.S3();
const etl = require("etl");
const unzip = require("unzip-stream");

function setupMetadata() {
  s3.getObject({Bucket: 'test', Key: 'CSV.zip'}).createReadStream()
    .pipe(unzip.Parse())
    .on('entry', function (entry) {
      var i = 0;
      var recordIdentifier;
      entry
      .pipe(etl.map(res => {
        if (recordIdentifier) {
          console.log(recordIdentifier);
          console.log(i++);
          // not sure about this. THis works but it only works for 1st file
          // after that the program terminates. I need to do that for all the
          // files in the zip
          entry.destroy(); 
        }
        const data = res.toString("utf-8");
        var array = data.toString().split("\n");
        if(array.length >= 3) {
          recordIdentifier = array[2].split(",")[0];
        }
      }))
    })
}

setupMetadata();

Я попытался позвонить entry.autodrain() после прочтения контента, но он не работает. entry.destroy() работает, но программа завершается после этого. Я хочу сделать то же самое для всех файлов в zip-архиве.

Любая помощь будет очень признательна.

Заранее спасибо.

Ответы [ 2 ]

2 голосов
/ 14 марта 2020

Я попытался повторить аналогичный случай. Я надеюсь, что вам нужно что-то вроде этого:

const etl = require("etl");
const unzip = require("unzip-stream");
const fs = require('fs');

function readRecord(entry, entNum) {

    let recordCount = 0;
    let etlcsv = entry.pipe(etl.csv())
    etlcsv.pipe(etl.map(d => {
        console.log(d);
        recordCount++;
        if (recordCount > 2) {
            etlcsv.destroy()
            entry.autodrain();
        }
    }))
}

function setupMetadata() {
    let entryCount = 0;
    let ent = {}

    let test = fs.createReadStream('csv.zip').pipe(unzip.Parse())
    test.on('entry', function(entry) {
        entryCount++;
        console.log(entryCount)
        readRecord(entry, entryCount)
    })

}

setupMetadata()

Проверьте этот REPL для теста: https://repl.it/@sandeepp2016 / PlushFatherlyMonotone

0 голосов
/ 14 марта 2020

Я сделал нечто похожее на это, используя get.

Мне пришлось адаптировать код, чтобы он не тестировался, но этот общий подход работал для меня. Я не использовал etl, поэтому я не уверен, как он обрабатывает куски файлов, а не целые файлы.

const aws = require("aws-sdk");
const s3 = new aws.S3();
const got = require('got');
const etl = require("etl");


const getAllFilesAsUrls = async(bucket_name, folder_name) => {

    const listParams = {
        Bucket: bucket_name,
        Delimiter: '/',
        StartAfter: `${folder_name}/`
    };

    const data = await s3.listObjectsV2(listParams).promise();

    const keys = data.Contents.map(object => object.key);

    const urlsArray = [];

    for (let key of keys) {

        const params = {Bucket: bucket_name, Key: key};
        let url = await s3.getSignedUrl('getObject', params).promise();
        urlsArray.push(url);

    }

    return urlsArray;


}

workEachFileUrl = (url) => {

    return new Promise((resolve, reject) => {

        //if you're looking to limit the amount of data transferred then this was a good way of doing it
        const gotstream = got.stream(url, { headers: { "accept-encoding": "gzip" } })
            .on('data', (chunk) => {
                //pause the stream as soon as we get the first chunk
                gotstream.pause();
                //do your work with the chunk, as long as etl can handle partial files, then resolve with the first few lines
                //otherwise just use the 'response' event as you were with                     const parsedChunk;
                resolve(parsedChunk);
            })
            .on("error", (err) => {    
                console.log(err);
                reject(err);
            });
    });

}

runOperation = async (bucket_name, folder_name) => {

    const records = [];

    const urls = await getAllFilesAsUrls(bucket_name, folder_name);

    for (let url of urls) {

        let record = await workEachFileUrl(url);
        records.push(record);

    }

    return records;

}

const completedRecords = await runOperation(bucket_name, folder_name);
...