Узел FTP скачивает файлы с одного сервера и загружает на другой сервер - PullRequest
0 голосов
/ 31 октября 2018

Я уже некоторое время осматриваюсь, и этот - единственный ресурс, который я нашел в Интернете, связанный с моей проблемой. Я пытаюсь загрузить файлы с одного ftp-сервера, а затем загрузить их на другой ftp-сервер, один за другим, используя обещания и без необходимости сохранять файлы локально во время процесса.

Сначала я рекурсивно вызываю client.List () из модуля ftp, чтобы получить массив путей к файлам, которые мне нужно загрузить с исходного ftp-сервера. Это отлично работает.

getRecursively(client, path) {
    var _this = this;
    let downloadList = [];
    let paths = [];
    let promise = new Promise((resolve, reject) => {
        client.list(path, function(err, list) {
            async function loop() {
                for (var i = 0; i < list.length; i++) {
                    if (list[i].type == 'd') {
                        let _list = await _this.getRecursively(client, path + '/' + list[i].name)
                        downloadList = downloadList.concat(_list);
                    } else {
                        if ( list[i].name.match(/\.(jpg|jpeg)$/i) ) {
                            downloadList.push({path: path, name: list[i].name});
                        }
                    }
                }
                console.log("One complete");
                resolve(downloadList);

            }

            loop();
        })
    })
    return promise;

}

Далее я перебираю список путей к файлам и отправляю обещания, которые регулируются с помощью модуля es6-promise-pool, поэтому сейчас предел его параллелизма равен 10.

Вот как выглядит каждое обещание:

getAndInsert(file) {
    let _this = this;
    let promise = new Promise((resolve, reject) => {
        let c = new Client();
        c.on('ready', () => {
            let d = new Client();
            d.on('ready', () => {
                c.get(file.path + '/' + file.name, function(err, stream) {
                    if (err) {console.log(err); console.log("FILE NAME: " + file.name)}
                    d.put(stream.pipe(passThrough()), '/images/' + file.name, function() {
                        _this.uploadCount += 1;
                        _this.uploadedImages.push(file.name)
                        console.log(_this.uploadCount + '/' + _this._list.length + " uploaded.")
                        c.end();
                        d.end();
                        resolve(true);
                    });

                });

            })

            d.on('error', (err) => {
                if (err) console.log(err);
                _this.onCompleteCallback();
            })

            d.connect(destinationFTP);
        })

        c.on('error', (err) => {
            if (err) console.log(err);
            _this.onCompleteCallback();
        })

        c.connect(sourceFTP);

    })
    return promise;
}

Каждое обещание устанавливает свое собственное соединение с исходным и целевым ftp-сервером. Я также использую объект Transform модуля stream, когда вызываю d.put(stream.pipe(passThrough()). Вот эта функция.

    const passThrough = () => {
       var passthrough = new Transform();
       passthrough._transform = function(data, encoding, done) {
           this.push(data);
           done();
       };
       return passthrough;
   }

Наконец, вот основной код, который выполняет обещания.

*buildPromises(list) {
    for (let i = 0; i < list.length; i++) {
        yield this.getAndInsert(list[i]);
    }
}

let iterator = _this.buildPromises(list);
var pool = new PromisePool(iterator, 10);
pool.start()
    .then(function(){
        console.log("Finished")
    }).catch((err) => {
        console.log(err);
        console.log("error processing pool promise");
    })

После этого все будет в порядке, но при отправке обещаний я получаю следующую ошибку:

Error: write after end
at writeAfterEnd (_stream_writable.js:236:12)
at Transform.Writable.write (_stream_writable.js:287:5)
at Socket.ondata (_stream_readable.js:639:20)
at emitOne (events.js:116:13)
at Socket.emit (events.js:211:7)
at Socket.Readable.read (_stream_readable.js:475:10)
at flow (_stream_readable.js:846:34)
at Transform.<anonymous> (_stream_readable.js:707:7)
at emitNone (events.js:106:13)

Это может пройти как 5, а затем выдать ошибку, а иногда и больше, но, похоже, это довольно последовательно. Я также заметил, что иногда я получаю похожие сообщения о том, что «файл уже используется», но каждый загружаемый файл имеет уникальное имя. Любая помощь приветствуется, и если вам нужна дополнительная информация, я сделаю все возможное, чтобы предоставить больше информации. Спасибо.

1 Ответ

0 голосов
/ 01 ноября 2018

Итак, я нашел решение. По моему getAndInsert() функция делала:

c.get(file.path + '/' + file.name, function(err, stream) {
    if (err) {console.log(err); console.log("FILE NAME: " + file.name)}
    d.put(stream.pipe(passThrough()), '/images/' + file.name, function() {
       _this.uploadCount += 1;
       _this.uploadedImages.push(file.name)
       console.log(_this.uploadCount + '/' + _this._list.length + " uploaded.")
       c.end();
       d.end();
       resolve(true);
    });
});

Проблема состояла в stream.pipe(passThrough()). Казалось, что я пытался писать после того, как stream уже закончился. Вот что решило мою проблему:

let chunks = [];
stream.on('data', (chunk) => {
     chunks.push(chunk);
})
stream.on('end', () => {
   d.put(Buffer.concat(chunks), '/images/' + file.name, function() {
      _this.uploadCount += 1;
      _this.uploadedImages.push(file.name)
      console.log(_this.uploadCount + '/' + _this._list.length + " uploaded.")
      c.end();
      d.end();
      resolve(true);
   });
})

Когда из потока доступны новые данные, отправьте их в массив с именем chunks. Когда поток закончится, позвоните .put и введите Buffer.concat(chunks)

Надеюсь, это поможет любому, кто столкнется с подобной проблемой.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...