Я пытаюсь создать небольшой конвейер потоковых компонентов для чтения данных из таблицы базы данных в конвертер JSON в CSV и результатов в файл в ZIP-архиве. Кажется, это хорошо работает, кроме записи данных в файл CSV. Файлы всегда пусты, в то время как итоговый файл, который я записываю прямо в архив, прекрасно.
Я пытался выяснить, прерывается ли поток в какой-то момент, но данные, кажется, проходят каждую часть конвейера.
Вот как выглядит основная часть кода:
//before:
//let archive: Archiver = create('zip');
//archive.pipe(fileWriterStream);
let archivePassThrough = new PassThrough();
archivePassThrough.on("data", (chunk) => {
this.logger.info("PT:" + chunk);
});
archive.append(archivePassThrough, { name: absTargetFile });
//Use pass through for piping db query results in converter
let dbPassThrough = new PassThrough();
dbPassThrough.on("data", (chunk) => {
this.logger.info(chunk);
});
//Convert JSON to CSV (stream)
let converterStream = new AsyncParser();
converterStream.fromInput(dbPassThrough).toOutput(archivePassThrough);
converterStream.processor.on("end", () => {
this.logger.info("Converting complete");
resolve(absTargetFile);
});
converterStream.processor.on("error", (err: any) => {
this.logger.info(err);
reject(err);
});
//Read data from result set
rs.setFetchSize(2048);
rs.createObjectStream().pipe(dbPassThrough)
.on('finish', function (){
if (!rs.closed) {
rs.close();
}
});
//afterwards:
//archive.finalize();
Показанная часть обернута в Обещание, которое разрешается после завершения обработки.
Архив завершается, когда все файлы были записаны.
Сам архив записывается на диск с помощью FileWriterStream.
Есть идеи, что я здесь не так делаю?