Рекурсивный setTimeout случайно останавливается - PullRequest
0 голосов
/ 11 марта 2019

Я создаю модуль, который позволяет пользователям захватывать живые потоки из источников m3u8 и dash-mpd. Мой модуль успешно прошел несколько тестов и смог захватить 100% потока, как я и ожидал; однако я обнаружил, что бывают случаи, когда рекурсивная функция setTimeout «случайно» не срабатывает, как ожидалось, и модуль, похоже, просто находится в состоянии зависания.

Не было никаких исключений или ошибок, которые были показаны, и я даже добавил обширную регистрацию во всем сценарии, чтобы идентифицировать любые потенциальные ошибки, но всегда кажется, что функция setTimeout просто прекращает запуск кода внутри него. Структура проекта - это начальный индексный сценарий для определения типа целевого потока, а затем общий анализатор для запуска команд, которые можно применить к потокам m3u8 и dash-mpd, а также более специфический анализатор m3u.

index.js : инициализация синтаксического анализатора и первоначальный захват содержимого файла списка воспроизведения m3u8

const stream = require('stream');
const url = require('url');
const http = require('http');
const https = require('https');
const fs = require('fs');

const m3u = require('./parsers/m3u');

/**
 * @param {String} path Path to stream playlist
 * @param {Object} options Objects for configuring playlist capture
 * @returns {ReadableStream} Readable stream of the playlist
 */
const hlsdl = (path, options) => {
    if(path instanceof Object || path == '') throw Error('A path to an M3U or MPD stream was not provided. Please be sure to include a path to continue');

    const hlsstream = new stream.PassThrough();
    options = options || {};
    options.path = path;
    options.timeout = (options.timeout) ? options.timeout : 2500;
    options.livebuffer = (options.livebuffer) ? options.livebuffer : 20000;

    const host = url.parse(path);

    let httpLib = null;
    let parser = (options.parser) ? getParser(`.${options.parser}`) : getParser(path);

    (host.protocol === 'http:') ? httpLib = http : httpLib = https;
    if(host.protocol != 'http:' && host.protocol != 'https:' && host.protocol != 'file:') {
        throw new Error('No protocol was included in the path provided. Please ensure an http, https, or file protocol is selected.')
    }

    if(host.protocol === 'file:') {
        fs.readFile(host.path, (err, data) => {
            if(err) throw Error('The path to the file provided does not exist. Please check again.');
            let internalStream = new parser(httpLib, hlsstream, options);
            internalStream.write(data);
        });
    } else {
        const downloadPlaylist = (host) => {
            httpLib.get(host, (res) => {
                let internalStream = new parser(httpLib, hlsstream, options);
                let responseBody = '';

                if (res.statusCode >= 500 || res.statusCode < 200) throw Error(`The path provided returned a ${res.statusCode} status code. Please ensure the request resource is available for access before continuing.`);
                res.on('error', (err) => { console.log(err); downloadPlaylist(host) });

                if (res.statusCode === 200) {
                    console.log('success');
                    res.on('data', chunk => {
                        responseBody += chunk;
                    });
                    res.on('end', () => {
                        internalStream.write(responseBody);
                    });
                } else {
                    res.on('data', chunk => console.log(chunk.toString()));
                }
            }).on('error', (err) => downloadPlaylist(host));
        }

        downloadPlaylist(host);
    }

    return hlsstream;
};

const getParser = (path) => {
    if(RegExp('.m3u').test(path)) return m3u;

    throw Error('No compatible HLS stream type detected. Please ensure you\'ve provided a direct link to an M3U or MPD stream.');
};

module.exports = hlsdl; 

После того, как index.js определит, какой парсер нужно использовать, он создает парсер на let internalStream = new parser(httpLib, hlsstream, options);. Парсер, над которым я сейчас работаю, это парсер m3u.

m3u.js : синтаксический анализатор для обработки списков воспроизведения в реальном времени в формате m3u / m3u8. каждый синтаксический анализатор основан на универсальном синтаксическом анализаторе

const GenericParser = require('./generic');
const url = require('url');

module.exports = class m3u extends GenericParser {

    constructor(httpLib, hlsstream, options) {
        super(httpLib, hlsstream, options);
    }

    _write(chunk) {
        let hasNoNewSegments = true;

        this.currentSources = 0, this.completed = 0;
        const playlist = chunk.toString().trim().split('\n');
        this.hlsstream.emit('status', 'Parsing through playlist');

        for (let index = 0; index < playlist.length; index++) {
            if (this._parse(playlist[index]) == false) break;
            if (playlist[index][0] !== '#') {
                if (Object.values(this.chunks).every(segment => segment.link == url.resolve(this.path, playlist[index])) == true) {
                    hasNoNewSegments = false;
                }
            }
        }

        if (hasNoNewSegments == true || this.live == true) {
            this.refreshAttempts++;
            this.hlsstream.emit('status', 'Fetching next playlist.....');
            this._fetchPlaylist();
        } else {
            this.refreshAttempts = 0;
        }
        if(this.refreshAttempts > 1) {
            this.hlsstream.emit('status', '---------------------');
            this.hlsstream.emit('status', 'Dumping fetched m3u8 file');
            this.hlsstream.emit('status', chunk);
            this.hlsstream.emit('status', '---------------------');
        }
        this.hlsstream.emit('status', 'Playlist parsing complete');
    }

    _parse(line) {
        let value = line;
        let info = null;

        if(line[0] == '#' && line.indexOf(':') != -1) {
            value = line.split(':')[0];
            info = line.split(':')[1];
        }

        switch(value) {
            case('#EXT-X-MEDIA-SEQUENCE'):
                if(info < this.sequence) return false;
                this.sequence = parseInt(info);
                break;
            case('#EXTINF'):
                this.duration += parseFloat(info.split(',')[0]);
                break;
            case('#EXT-X-ENDLIST'):
                this.live = false;
                break;
            default:
                if(value[0] != '#') {
                    if(!Object.values(this.chunks).some(x => x.link == url.resolve(this.path, value))) {
                        this.currentSources++;
                        this._download(url.resolve(this.path, value), this.sources++);
                    }
                }
                break;
        }
    }

}; 

Однако, как упоминалось в описании парсера m3u.js, каждый парсер использует универсальный парсер в качестве своей базы.

generic.js : предоставляет общую базу разбора для всех форматов синтаксического анализатора, включая логику загрузки отдельных аудиосегментов и захвата содержимого списка воспроизведения каждые 20 секунд

const stream = require('stream');
const url = require('url');

module.exports = class GenericParser extends stream.Duplex {

    constructor(httpLib, hlsstream, options) {
        super();
        this.hlsstream = hlsstream
        this.httpLib = httpLib;
        this.options = options;
        this.path = options.path;

        this.sources = 0, this.completed = 0, this.currentSources;
        this.sequence = 0;
        this.totalsize = 0;
        this.chunks = {};
        this.downloading = false;
        this.live = true;
        this.refreshAttempts = 0, this.playlistRefresh = null, this.timerSet = false;

        this.pipe(hlsstream, {
            end: false
        });
        this.on('end', () => {
            this.hlsstream.end();
        });
    }


    _read() {}
    _write() {}


    _download(link, index) {
        this.downloading = true, this.refreshAttempts = 0;
        this.hlsstream.emit('status', `Downloading segment ${index}`);

        let req = this.httpLib.get(link, (res) => {
            let timeout = setTimeout(() => {
                req.abort();
                this.completed--;
                this.hlsstream.emit('issue', `02: Failed to retrieve segment on time. Attempting to fetch segment again. [${index}]`);
                this._download(url.resolve(this.path, link), index);
            }, this.options.timeout);
            if(res.statusCode >= 400 || res.statusCode < 200) {
                this.hlsstream.emit('issue', `01B: An error occurred when attempting to retrieve a segment file. Attempting to fetch segment again. [${index}]`);
                this._download(url.resolve(this.path, link), index);
            }

            let body = [];
            res.on('data', (chunk) => {
                this.totalsize += chunk.length;
                body.push(chunk);
            });
            res.on('error', (err) => {
                this.hlsstream.emit('issue', `01C: An error occurred when attempting to retrieve a segment file. Attempting to fetch segment again. [${index}]`);
                this._download(url.resolve(this.path, link), index);
            });
            res.on('end', () => {
                clearTimeout(timeout);
                this.completed++;
                this.hlsstream.emit('status', 'Completed download of segment ' + index);
                this.chunks[index] = { link: url.resolve(this.path, link), buffer: Buffer.concat(body) };
                if(this.completed === this.currentSources) this._save();
            });
        });

        req.on('error', (err) => {
            this.hlsstream.emit('issue', `01A: An error occurred when attempting to retrieve a segment file. Attempting to fetch segment again. [${index}]`);
            this._download(url.resolve(this.path, link), index);
        });
    }


    _save() {
        this.hlsstream.emit('status', 'Pushing segments to stream');

        let index = Object.values(this.chunks).length - this.currentSources;
        let length = Object.values(this.chunks).length;

        try {
            for (index; index < length; index++) {
                this.push(this.chunks[index].buffer);
                this.hlsstream.emit('segment', {
                    id: index,
                    url: url.resolve(this.path, this.chunks[index].link),
                    size: this.chunks[index].buffer.length,
                    totalsegments: this.sources - 1
                });
                delete this.chunks[index].buffer;

                if (index == this.sources - 1 && this.live == false) {
                    this.hlsstream.emit('status', 'Finished pushing segments to stream');
                    this.downloading = false;
                    this.push(null);
                }
            }
        } catch(e) {
            console.log(e);
            this.hlsstream.emit('issue', 'A critical error occurred when writing to stream. Dumping chunk data now:');
            this.hlsstream.emit('issue', JSON.stringify(this.chunks));
        }
    }


    _fetchPlaylist() {
        if (this.live == true) {
            this.hlsstream.emit('status', 'Refresh Attempts: ' + this.refreshAttempts);
            if (this.refreshAttempts < 5) {
                this.hlsstream.emit('status', 'Setting setTimeout in _fetchPlaylist()...');
                this.playlistRefresh = setTimeout(() => {
                    this.timerSet = false;
                    this.hlsstream.emit('status', `_fetchPlaylist setTimeout triggered...[${new Date()}]`);
                    let req = this.httpLib.get(this.path, (res) => {
                        let responseBody = '';

                        let timeout = setTimeout(() => {
                            req.abort();
                            this.hlsstream.emit('issue', '05: Failed to retrieve playlist on time. Attempting to fetch playlist again.');
                            this.refreshAttempts++;
                            this._fetchPlaylist();
                        }, this.options.timeout);
                        res.on('error', (err) => {
                            this.hlsstream.emit('issue', '03A: An error occurred on the response when attempting to fetch the latest playlist: ' + err);
                            this.refreshAttempts++;
                            this._fetchPlaylist()
                        });

                        if (res.statusCode === 200) {
                            res.on('data', chunk => {
                                responseBody += chunk;
                            });
                            res.on('end', () => {
                                clearTimeout(timeout);
                                this._write(responseBody);
                            });
                        } else {
                            this.hlsstream.emit('issue', '04: Fetching playlist returned an HTTP code other than 200: ' + res.statusCode);
                        }
                    });
                    req.on('error', (err) => {
                        this.hlsstream.emit('issue', '03B: An error occurred on the request when attempting to fetch the latest playlist: ' + err);
                        this.refreshAttempts++;
                        this._fetchPlaylist();
                    });
                }, this.options.livebuffer);
                this.timerSet = true;

                console.log(this.playlistRefresh);
                console.log(this.timerSet);
                this.hlsstream.emit('status', 'SetTimeout object: ' + this.playlistRefresh);
                this.hlsstream.emit('status', 'Is setTimeout live?: ' + this.timerSet);
            } else {
                this.hlsstream.emit('status', 'Live stream completed');
                this.push(null);
            }
        }
    }
}; 

Теперь корень проблемы заключается в том, что setTimeout не запускает анонимную функцию в нем, как ожидалось. Логика setTimeout находится в функции _fetchPlaylist() в generic.js и вызывается в функции _write() внутри файла m3u.js .

Я не могу понять, почему setTimeout случайно не срабатывает, и был бы признателен за любую помощь или идеи, которые я могу получить.

Заранее спасибо за помощь.

...