Я создаю модуль, который позволяет пользователям захватывать живые потоки из источников m3u8 и dash-mpd. Мой модуль успешно прошел несколько тестов и смог захватить 100% потока, как я и ожидал; однако я обнаружил, что бывают случаи, когда рекурсивная функция setTimeout «случайно» не срабатывает, как ожидалось, и модуль, похоже, просто находится в состоянии зависания.
Не было никаких исключений или ошибок, которые были показаны, и я даже добавил обширную регистрацию во всем сценарии, чтобы идентифицировать любые потенциальные ошибки, но всегда кажется, что функция setTimeout просто прекращает запуск кода внутри него. Структура проекта - это начальный индексный сценарий для определения типа целевого потока, а затем общий анализатор для запуска команд, которые можно применить к потокам m3u8 и dash-mpd, а также более специфический анализатор m3u.
index.js : инициализация синтаксического анализатора и первоначальный захват содержимого файла списка воспроизведения m3u8
const stream = require('stream');
const url = require('url');
const http = require('http');
const https = require('https');
const fs = require('fs');
const m3u = require('./parsers/m3u');
/**
* @param {String} path Path to stream playlist
* @param {Object} options Objects for configuring playlist capture
* @returns {ReadableStream} Readable stream of the playlist
*/
const hlsdl = (path, options) => {
if(path instanceof Object || path == '') throw Error('A path to an M3U or MPD stream was not provided. Please be sure to include a path to continue');
const hlsstream = new stream.PassThrough();
options = options || {};
options.path = path;
options.timeout = (options.timeout) ? options.timeout : 2500;
options.livebuffer = (options.livebuffer) ? options.livebuffer : 20000;
const host = url.parse(path);
let httpLib = null;
let parser = (options.parser) ? getParser(`.${options.parser}`) : getParser(path);
(host.protocol === 'http:') ? httpLib = http : httpLib = https;
if(host.protocol != 'http:' && host.protocol != 'https:' && host.protocol != 'file:') {
throw new Error('No protocol was included in the path provided. Please ensure an http, https, or file protocol is selected.')
}
if(host.protocol === 'file:') {
fs.readFile(host.path, (err, data) => {
if(err) throw Error('The path to the file provided does not exist. Please check again.');
let internalStream = new parser(httpLib, hlsstream, options);
internalStream.write(data);
});
} else {
const downloadPlaylist = (host) => {
httpLib.get(host, (res) => {
let internalStream = new parser(httpLib, hlsstream, options);
let responseBody = '';
if (res.statusCode >= 500 || res.statusCode < 200) throw Error(`The path provided returned a ${res.statusCode} status code. Please ensure the request resource is available for access before continuing.`);
res.on('error', (err) => { console.log(err); downloadPlaylist(host) });
if (res.statusCode === 200) {
console.log('success');
res.on('data', chunk => {
responseBody += chunk;
});
res.on('end', () => {
internalStream.write(responseBody);
});
} else {
res.on('data', chunk => console.log(chunk.toString()));
}
}).on('error', (err) => downloadPlaylist(host));
}
downloadPlaylist(host);
}
return hlsstream;
};
const getParser = (path) => {
if(RegExp('.m3u').test(path)) return m3u;
throw Error('No compatible HLS stream type detected. Please ensure you\'ve provided a direct link to an M3U or MPD stream.');
};
module.exports = hlsdl;
После того, как index.js определит, какой парсер нужно использовать, он создает парсер на let internalStream = new parser(httpLib, hlsstream, options);
. Парсер, над которым я сейчас работаю, это парсер m3u.
m3u.js : синтаксический анализатор для обработки списков воспроизведения в реальном времени в формате m3u / m3u8. каждый синтаксический анализатор основан на универсальном синтаксическом анализаторе
const GenericParser = require('./generic');
const url = require('url');
module.exports = class m3u extends GenericParser {
constructor(httpLib, hlsstream, options) {
super(httpLib, hlsstream, options);
}
_write(chunk) {
let hasNoNewSegments = true;
this.currentSources = 0, this.completed = 0;
const playlist = chunk.toString().trim().split('\n');
this.hlsstream.emit('status', 'Parsing through playlist');
for (let index = 0; index < playlist.length; index++) {
if (this._parse(playlist[index]) == false) break;
if (playlist[index][0] !== '#') {
if (Object.values(this.chunks).every(segment => segment.link == url.resolve(this.path, playlist[index])) == true) {
hasNoNewSegments = false;
}
}
}
if (hasNoNewSegments == true || this.live == true) {
this.refreshAttempts++;
this.hlsstream.emit('status', 'Fetching next playlist.....');
this._fetchPlaylist();
} else {
this.refreshAttempts = 0;
}
if(this.refreshAttempts > 1) {
this.hlsstream.emit('status', '---------------------');
this.hlsstream.emit('status', 'Dumping fetched m3u8 file');
this.hlsstream.emit('status', chunk);
this.hlsstream.emit('status', '---------------------');
}
this.hlsstream.emit('status', 'Playlist parsing complete');
}
_parse(line) {
let value = line;
let info = null;
if(line[0] == '#' && line.indexOf(':') != -1) {
value = line.split(':')[0];
info = line.split(':')[1];
}
switch(value) {
case('#EXT-X-MEDIA-SEQUENCE'):
if(info < this.sequence) return false;
this.sequence = parseInt(info);
break;
case('#EXTINF'):
this.duration += parseFloat(info.split(',')[0]);
break;
case('#EXT-X-ENDLIST'):
this.live = false;
break;
default:
if(value[0] != '#') {
if(!Object.values(this.chunks).some(x => x.link == url.resolve(this.path, value))) {
this.currentSources++;
this._download(url.resolve(this.path, value), this.sources++);
}
}
break;
}
}
};
Однако, как упоминалось в описании парсера m3u.js, каждый парсер использует универсальный парсер в качестве своей базы.
generic.js : предоставляет общую базу разбора для всех форматов синтаксического анализатора, включая логику загрузки отдельных аудиосегментов и захвата содержимого списка воспроизведения каждые 20 секунд
const stream = require('stream');
const url = require('url');
module.exports = class GenericParser extends stream.Duplex {
constructor(httpLib, hlsstream, options) {
super();
this.hlsstream = hlsstream
this.httpLib = httpLib;
this.options = options;
this.path = options.path;
this.sources = 0, this.completed = 0, this.currentSources;
this.sequence = 0;
this.totalsize = 0;
this.chunks = {};
this.downloading = false;
this.live = true;
this.refreshAttempts = 0, this.playlistRefresh = null, this.timerSet = false;
this.pipe(hlsstream, {
end: false
});
this.on('end', () => {
this.hlsstream.end();
});
}
_read() {}
_write() {}
_download(link, index) {
this.downloading = true, this.refreshAttempts = 0;
this.hlsstream.emit('status', `Downloading segment ${index}`);
let req = this.httpLib.get(link, (res) => {
let timeout = setTimeout(() => {
req.abort();
this.completed--;
this.hlsstream.emit('issue', `02: Failed to retrieve segment on time. Attempting to fetch segment again. [${index}]`);
this._download(url.resolve(this.path, link), index);
}, this.options.timeout);
if(res.statusCode >= 400 || res.statusCode < 200) {
this.hlsstream.emit('issue', `01B: An error occurred when attempting to retrieve a segment file. Attempting to fetch segment again. [${index}]`);
this._download(url.resolve(this.path, link), index);
}
let body = [];
res.on('data', (chunk) => {
this.totalsize += chunk.length;
body.push(chunk);
});
res.on('error', (err) => {
this.hlsstream.emit('issue', `01C: An error occurred when attempting to retrieve a segment file. Attempting to fetch segment again. [${index}]`);
this._download(url.resolve(this.path, link), index);
});
res.on('end', () => {
clearTimeout(timeout);
this.completed++;
this.hlsstream.emit('status', 'Completed download of segment ' + index);
this.chunks[index] = { link: url.resolve(this.path, link), buffer: Buffer.concat(body) };
if(this.completed === this.currentSources) this._save();
});
});
req.on('error', (err) => {
this.hlsstream.emit('issue', `01A: An error occurred when attempting to retrieve a segment file. Attempting to fetch segment again. [${index}]`);
this._download(url.resolve(this.path, link), index);
});
}
_save() {
this.hlsstream.emit('status', 'Pushing segments to stream');
let index = Object.values(this.chunks).length - this.currentSources;
let length = Object.values(this.chunks).length;
try {
for (index; index < length; index++) {
this.push(this.chunks[index].buffer);
this.hlsstream.emit('segment', {
id: index,
url: url.resolve(this.path, this.chunks[index].link),
size: this.chunks[index].buffer.length,
totalsegments: this.sources - 1
});
delete this.chunks[index].buffer;
if (index == this.sources - 1 && this.live == false) {
this.hlsstream.emit('status', 'Finished pushing segments to stream');
this.downloading = false;
this.push(null);
}
}
} catch(e) {
console.log(e);
this.hlsstream.emit('issue', 'A critical error occurred when writing to stream. Dumping chunk data now:');
this.hlsstream.emit('issue', JSON.stringify(this.chunks));
}
}
_fetchPlaylist() {
if (this.live == true) {
this.hlsstream.emit('status', 'Refresh Attempts: ' + this.refreshAttempts);
if (this.refreshAttempts < 5) {
this.hlsstream.emit('status', 'Setting setTimeout in _fetchPlaylist()...');
this.playlistRefresh = setTimeout(() => {
this.timerSet = false;
this.hlsstream.emit('status', `_fetchPlaylist setTimeout triggered...[${new Date()}]`);
let req = this.httpLib.get(this.path, (res) => {
let responseBody = '';
let timeout = setTimeout(() => {
req.abort();
this.hlsstream.emit('issue', '05: Failed to retrieve playlist on time. Attempting to fetch playlist again.');
this.refreshAttempts++;
this._fetchPlaylist();
}, this.options.timeout);
res.on('error', (err) => {
this.hlsstream.emit('issue', '03A: An error occurred on the response when attempting to fetch the latest playlist: ' + err);
this.refreshAttempts++;
this._fetchPlaylist()
});
if (res.statusCode === 200) {
res.on('data', chunk => {
responseBody += chunk;
});
res.on('end', () => {
clearTimeout(timeout);
this._write(responseBody);
});
} else {
this.hlsstream.emit('issue', '04: Fetching playlist returned an HTTP code other than 200: ' + res.statusCode);
}
});
req.on('error', (err) => {
this.hlsstream.emit('issue', '03B: An error occurred on the request when attempting to fetch the latest playlist: ' + err);
this.refreshAttempts++;
this._fetchPlaylist();
});
}, this.options.livebuffer);
this.timerSet = true;
console.log(this.playlistRefresh);
console.log(this.timerSet);
this.hlsstream.emit('status', 'SetTimeout object: ' + this.playlistRefresh);
this.hlsstream.emit('status', 'Is setTimeout live?: ' + this.timerSet);
} else {
this.hlsstream.emit('status', 'Live stream completed');
this.push(null);
}
}
}
};
Теперь корень проблемы заключается в том, что setTimeout не запускает анонимную функцию в нем, как ожидалось. Логика setTimeout находится в функции _fetchPlaylist()
в generic.js и вызывается в функции _write()
внутри файла m3u.js .
Я не могу понять, почему setTimeout случайно не срабатывает, и был бы признателен за любую помощь или идеи, которые я могу получить.
Заранее спасибо за помощь.