Мне интересно, как правильно обрабатывать несколько потоков чтения асинхронно, чтобы я мог объединить результаты в один файл.Потоки чтения поступают из файлов на SFTP-сервере, я использую библиотеку с именем ssh2-sftp-client
для чтения.
Это то, что я хочу сделать.
const fs = require('fs');
const Client = require('ssh2-sftp-client');
const concat = require('concat-stream');
const sftp = new Client();
sftp.connect(configObj).then(() => {
const fileNames = ['file1', 'file2'];
const streamPromiseArr = [];
// Loop through the files
for (fileName of fileNames) {
// Collect the promises (each promise resolves to a final buffer)
const streamPromise = new Promise((resolve, reject) => {
sftp.get(fileName, true, 'utf8').then((stream) => {
stream
.pipe(decrypt)
.pipe(unzipper.Parse())
.pipe(selectFile)
.pipe(filterArray)
.pipe(concat(res => resolve(res)))
})
});
streamPromiseArr.push(streamPromise);
}
// Collect all the buffers and write them to a file
Promise.all(streamPromiseArr).then((buffArr) => {
const finalBuff = Buffer.concat(buffArr);
fs.writeFile('consolidate_file.csv', finalBuff, ((err) => {
if (err) console.log(err);
}));
}).then(() => sftp.end());
})
Это даетмне ошибка:
TypeError: cb is not a function
at afterWrite (_stream_writable.js:454:3)
at onwrite (_stream_writable.js:445:7)
at /Users/jonlamb/projects/node-test/node_modules/unzipper/lib/PullStream.js:59:60
at afterWrite (_stream_writable.js:454:3)
at _combinedTickCallback (internal/process/next_tick.js:144:20)
at process._tickCallback (internal/process/next_tick.js:180:9)
Я считаю, что это связано с тем, когда я звоню sftp.end()
.Я не знаю, почему это так, но если я приковываю sftp.end()
к концу обещания sftp.get()
, я не получаю сообщение об ошибке, но затем соединение закрывается для второго цикла, поэтому происходит сбой.