Как загрузить несколько ссылок из CSV-файла, используя многопоточность в node.js? - PullRequest
0 голосов
/ 06 октября 2018

Я пытаюсь загрузить ссылки из CSV-файла и сохранить загруженные файлы в папке.Для этого я использовал многопоточную библиотеку, т.е. mt-files-downloader .Файлы загружаются нормально, но скачивание около 313 файлов занимает слишком много времени.Максимальный размер этих файлов - около 400 КБ.Когда я пытался использовать обычную загрузку с использованием узла, я мог загрузить их через минуту или две, но с этой библиотекой загрузка должна быть быстрой, так как я использую многопоточную библиотеку, но это занимает много времени.Ниже мой код, любая помощь будет полезна.Спасибо!

var rec;


csv
    .fromStream(stream, { headers: ["Recording", , , , , , , ,] })
    .on("data",  function (records) {
        rec = records.Recording;
        //console.log(rec);
         download(rec);

    })


    .on("end", function () {
        console.log('Reading complete')
    });

  function download(rec) {

    var filename = rec.replace(/\//g, '');
    var filePath = './recordings/'+filename;
    var downloadPath = path.resolve(filePath)
    var fileUrl = 'http:' + rec;

    var downloader = new Downloader();
    var dl = downloader.download(fileUrl, downloadPath);
        dl.start();   

        dl.on('error', function(dl) { 
            var dlUrl = dl.url;
            console.log('error downloading = > '+dl.url+' restarting download....');

            if(!dlUrl.endsWith('.wav') && !dlUrl.endsWith('Recording')){
                console.log('resuming file download => '+dlUrl);
                dl.resume();
            }

        });


}

1 Ответ

0 голосов
/ 08 ноября 2018

Вы правы, загрузка 313 файлов размером 400 КБ не займет много времени - и я не думаю, что это связано с вашим кодом - может быть, соединение плохое?Вы пытались загрузить один файл через curl?

В любом случае в вашем подходе я вижу две проблемы, с которыми я могу помочь:

  • сначала - вы загружаете все файлы нав то же время (что может привести к некоторым издержкам на сервере)
  • секунда - обработка ошибок будет выполняться в цикле, не ожидая и не проверяя фактический файл, поэтому, если есть 404 - вы заполняете сервер запросами.

Использование потоков с событиями on('data') имеет существенный недостаток - выполнение всех фрагментов более или менее синхронно по мере их чтения.Это означает, что ваш код будет исполнять то, что находится в обработчике on('data'), никогда не ожидая завершения ваших загрузок.Единственным ограничивающим фактором является то, насколько быстро сервер может читать csv - и я ожидаю, что миллионы строк в секунду будут нормальными.

С точки зрения сервера, вы просто запрашиваете 313 файлов одновременно,в результате чего, не желая спекулировать действительными техническими механизмами сервера, некоторые из этих запросов ожидают и мешают друг другу.

Это можно решить с помощью потоковой среды, такой как scramjet,Например, event-steram или highland.Я являюсь автором первого, и это ИМХО самое простое в этом случае, но вы можете использовать любое из них, немного изменив код, чтобы соответствовать их API - во всяком случае, оно довольно похоже во всех случаях.

Вот код с сильными комментариями, который будет запускать пару загрузок параллельно:

const {StringStream} = require("scramjet");
const sleep = require("sleep-promise");
const Downloader = require('mt-files-downloader');

const downloader = new Downloader();

const {StringStream} = require("scramjet");
const sleep = require("sleep-promise");
const Downloader = require('mt-files-downloader');

const downloader = new Downloader();

// First we create a StringStream class from your csv stream
StringStream.from(csvStream)
    // we parse it as CSV without columns
    .CSVParse({header: false})
    // we set the limit of parallel operations, it will get propagated.
    .setOptions({maxParallel: 16})
    // now we extract the first column as `recording` and create a
    // download request.
    .map(([recording]) => {
        // here's the first part of your code
        const filename = rec.replace(/\//g, '');
        const filePath = './recordings/'+filename;
        const downloadPath = path.resolve(filePath)
        const fileUrl = 'http:' + rec;

        // at this point we return the dl object so we can keep these
        // parts separate.
        // see that the download hasn't been started yet
        return downloader.download(fileUrl, downloadPath);
    })
    // what we get is a stream of not started download objects
    // so we run this asynchronous function. If this returns a Promise
    // it will wait
    .map(
        async (dl) => new Promise((res, rej) => {
            // let's assume a couple retries we allow
            let retries = 10;

            dl.on('error', async (dl) => {
                try {
                    // here we reject if the download fails too many times.
                    if (retries-- === 0) throw new Error(`Download of ${dl.url} failed too many times`);

                    var dlUrl = dl.url;
                    console.log('error downloading = > '+dl.url+' restarting download....');

                    if(!dlUrl.endsWith('.wav') && !dlUrl.endsWith('Recording')){
                        console.log('resuming file download => '+dlUrl);
                        // lets wait half a second before retrying
                        await sleep(500);
                        dl.resume();
                    }
                } catch(e) {
                    // here we call the `reject` function - meaning that 
                    // this file wasn't downloaded despite retries.
                    rej(e);
                }
            });
            // here we call `resolve` function to confirm that the file was
            // downloaded.
            dl.on('end', () => res());
        })
    )
    // we log some message and ignore the result in case of an error
    .catch(e => {
        console.error('An error occured:', e.message);
        return;
    })
    // Every steram must have some sink to flow to, the `run` method runs
    // every operation above.
    .run();

Вы также можете использовать поток, чтобы вытолкнуть какие-то сообщения журнала и использовать pipe(process.stderr) вконец, вместо этих console.logs.Пожалуйста, проверьте документацию scramjet для получения дополнительной информации и документ Mozilla об асинхронных функциях

...