Я уже некоторое время осматриваюсь, и этот - единственный ресурс, который я нашел в Интернете, связанный с моей проблемой. Я пытаюсь загрузить файлы с одного ftp-сервера, а затем загрузить их на другой ftp-сервер, один за другим, используя обещания и без необходимости сохранять файлы локально во время процесса.
Сначала я рекурсивно вызываю client.List () из модуля ftp
, чтобы получить массив путей к файлам, которые мне нужно загрузить с исходного ftp-сервера. Это отлично работает.
getRecursively(client, path) {
var _this = this;
let downloadList = [];
let paths = [];
let promise = new Promise((resolve, reject) => {
client.list(path, function(err, list) {
async function loop() {
for (var i = 0; i < list.length; i++) {
if (list[i].type == 'd') {
let _list = await _this.getRecursively(client, path + '/' + list[i].name)
downloadList = downloadList.concat(_list);
} else {
if ( list[i].name.match(/\.(jpg|jpeg)$/i) ) {
downloadList.push({path: path, name: list[i].name});
}
}
}
console.log("One complete");
resolve(downloadList);
}
loop();
})
})
return promise;
}
Далее я перебираю список путей к файлам и отправляю обещания, которые регулируются с помощью модуля es6-promise-pool
, поэтому сейчас предел его параллелизма равен 10.
Вот как выглядит каждое обещание:
getAndInsert(file) {
let _this = this;
let promise = new Promise((resolve, reject) => {
let c = new Client();
c.on('ready', () => {
let d = new Client();
d.on('ready', () => {
c.get(file.path + '/' + file.name, function(err, stream) {
if (err) {console.log(err); console.log("FILE NAME: " + file.name)}
d.put(stream.pipe(passThrough()), '/images/' + file.name, function() {
_this.uploadCount += 1;
_this.uploadedImages.push(file.name)
console.log(_this.uploadCount + '/' + _this._list.length + " uploaded.")
c.end();
d.end();
resolve(true);
});
});
})
d.on('error', (err) => {
if (err) console.log(err);
_this.onCompleteCallback();
})
d.connect(destinationFTP);
})
c.on('error', (err) => {
if (err) console.log(err);
_this.onCompleteCallback();
})
c.connect(sourceFTP);
})
return promise;
}
Каждое обещание устанавливает свое собственное соединение с исходным и целевым ftp-сервером. Я также использую объект Transform
модуля stream
, когда вызываю d.put(stream.pipe(passThrough())
. Вот эта функция.
const passThrough = () => {
var passthrough = new Transform();
passthrough._transform = function(data, encoding, done) {
this.push(data);
done();
};
return passthrough;
}
Наконец, вот основной код, который выполняет обещания.
*buildPromises(list) {
for (let i = 0; i < list.length; i++) {
yield this.getAndInsert(list[i]);
}
}
let iterator = _this.buildPromises(list);
var pool = new PromisePool(iterator, 10);
pool.start()
.then(function(){
console.log("Finished")
}).catch((err) => {
console.log(err);
console.log("error processing pool promise");
})
После этого все будет в порядке, но при отправке обещаний я получаю следующую ошибку:
Error: write after end
at writeAfterEnd (_stream_writable.js:236:12)
at Transform.Writable.write (_stream_writable.js:287:5)
at Socket.ondata (_stream_readable.js:639:20)
at emitOne (events.js:116:13)
at Socket.emit (events.js:211:7)
at Socket.Readable.read (_stream_readable.js:475:10)
at flow (_stream_readable.js:846:34)
at Transform.<anonymous> (_stream_readable.js:707:7)
at emitNone (events.js:106:13)
Это может пройти как 5, а затем выдать ошибку, а иногда и больше, но, похоже, это довольно последовательно. Я также заметил, что иногда я получаю похожие сообщения о том, что «файл уже используется», но каждый загружаемый файл имеет уникальное имя. Любая помощь приветствуется, и если вам нужна дополнительная информация, я сделаю все возможное, чтобы предоставить больше информации. Спасибо.