Хорошо, вот тизер мозга ...
pipeFlow :
export function main(event) {
let s3 = new AWS.S3();
let data = event;
const fileName = data.file;
const params = {
Bucket: config.s3.inputBucketName,
Key: fileName
};
const fileStream = s3.getObject(params).createReadStream();
const parser = csv.parse({
skip_empty_lines: false,
auto_parse: false,
columns: true,
delimiter: ','
});
const transformer = csv.transform((data) => {
const keys = Object.keys(data);
const values = _.values(data);
if (!_.isEqual(keys, values)) {
return doSomething(data, param1, param2).then(() => {
/* eslint-disable no-console */
// console.log(result);
}).catch((e) => {
console.error(e);
});
}
},
{ parallel: 1 }
);
const stringifier = csv.stringify();
fileStream
.pipe(parser).on('error', (error) => {
console.log('parser error');
console.log(error);
}).pipe(transformer).on('error', (error) => {
console.log('transformer error');
console.log(error);
}).pipe(stringifier).on('error', (error) => {
console.log('stringifier error');
console.log(error);
});
}
eventFlow :
export function main(event) {
let s3 = new AWS.S3();
let data = event;
let streamData;
let parserData;
let transformData;
const fileName = data.file;
const params = {
Bucket: config.s3.inputBucketName,
Key: fileName
};
const fileStream = s3.getObject(params).createReadStream();
const parser = csv.parse({
skip_empty_lines: false,
auto_parse: false,
columns: true,
delimiter: ','
});
const transformer = csv.transform((data) => {
const keys = Object.keys(data);
const values = _.values(data);
if (!_.isEqual(keys, values)) {
return doSomething(data, param1, param2).then(() => {
/* eslint-disable no-console */
// console.log(result);
}).catch((e) => {
console.error(e);
});
}
},
{ parallel: 1 }
);
const stringifier = csv.stringify();
fileStream.on('readable', function () {
while ((streamData = fileStream.read())) {
parser.write(streamData);
}
});
parser.on('readable', function () {
while ((parserData = parser.read())) {
transformer.write(parserData);
// stringifier.write(data);
}
});
transformer.on('readable', function () {
while ((transformData = transformer.read())) {
stringifier.write(transformData);
}
});
}
Итак, pipeFlow работает, но выводит только 32785 записей перед выходом без ошибок, предупреждений или чего-то странного.
eventFlow работает отлично и выводит все записи независимо от того,размер файла.
Мой вопрос: «Что, черт возьми, парень?»
Чтение в Интернете, кажется, указывает на правильный и, на мой взгляд, более легкий для чтения подход, использующий каналы.Чего мне не хватает в связи с тем, что каналы завершаются рано без ошибок.
Кроме того, потоковая передача из локального файла (с использованием узлов fs) вместо потоковой передачи из s3, по-видимому, позволяет ОБА потокам работать правильно.
Кроме того, я пробовал несколько файлов и видел эту проблему раньше, когда создавал большой экспортер баз данных при потоковой передаче данных из AWS.