Node.JS Пары с трубкой против событий - PullRequest
0 голосов
/ 02 февраля 2019

Хорошо, вот тизер мозга ...

pipeFlow :

export function main(event) {
let s3 = new AWS.S3();
let data = event;
const fileName = data.file;

const params = {
    Bucket: config.s3.inputBucketName,
    Key: fileName
};

const fileStream = s3.getObject(params).createReadStream();

const parser = csv.parse({
    skip_empty_lines: false,
    auto_parse: false,
    columns: true,
    delimiter: ','
});

const transformer = csv.transform((data) => {
    const keys = Object.keys(data);
    const values = _.values(data);
    if (!_.isEqual(keys, values)) {
        return doSomething(data, param1, param2).then(() => {
            /* eslint-disable no-console */
            // console.log(result);
        }).catch((e) => {
            console.error(e);
        });
    }
},
{ parallel: 1 }
);

const stringifier = csv.stringify();

fileStream
    .pipe(parser).on('error', (error) => {
        console.log('parser error');
        console.log(error);
    }).pipe(transformer).on('error', (error) => {
        console.log('transformer error');
        console.log(error);
    }).pipe(stringifier).on('error', (error) => {
        console.log('stringifier error');
        console.log(error);
    });
}

eventFlow :

export function main(event) {
let s3 = new AWS.S3();
let data = event;
let streamData;
let parserData;
let transformData;
const fileName = data.file;

const params = {
    Bucket: config.s3.inputBucketName,
    Key: fileName
};

const fileStream = s3.getObject(params).createReadStream();

const parser = csv.parse({
    skip_empty_lines: false,
    auto_parse: false,
    columns: true,
    delimiter: ','
});

const transformer = csv.transform((data) => {
    const keys = Object.keys(data);
    const values = _.values(data);
    if (!_.isEqual(keys, values)) {
        return doSomething(data, param1, param2).then(() => {
            /* eslint-disable no-console */
            // console.log(result);
        }).catch((e) => {
            console.error(e);
        });
    }
},
{ parallel: 1 }
);

const stringifier = csv.stringify();

fileStream.on('readable', function () {
   while ((streamData = fileStream.read())) {
        parser.write(streamData);
    }
});

parser.on('readable', function () {
    while ((parserData = parser.read())) {
        transformer.write(parserData);
        // stringifier.write(data);
    }
});

transformer.on('readable', function () {
    while ((transformData = transformer.read())) {
        stringifier.write(transformData);
    }
});
}

Итак, pipeFlow работает, но выводит только 32785 записей перед выходом без ошибок, предупреждений или чего-то странного.

eventFlow работает отлично и выводит все записи независимо от того,размер файла.

Мой вопрос: «Что, черт возьми, парень?»

Чтение в Интернете, кажется, указывает на правильный и, на мой взгляд, более легкий для чтения подход, использующий каналы.Чего мне не хватает в связи с тем, что каналы завершаются рано без ошибок.

Кроме того, потоковая передача из локального файла (с использованием узлов fs) вместо потоковой передачи из s3, по-видимому, позволяет ОБА потокам работать правильно.

Кроме того, я пробовал несколько файлов и видел эту проблему раньше, когда создавал большой экспортер баз данных при потоковой передаче данных из AWS.

...