Что вам нужно сделать, это отправлять чанки на сервер сокетов каждый раз, когда на клиенте запускается событие .on('data')
на образе Readable stream
на клиенте, а затем, когда вы получаете чанки, вы записываете их в Writeable Stream
на сторона сервера веб-сокетов.
Есть несколько вещей, которые нужно иметь в виду:
- Вам потребуется обнаружить EOF на сервере (проверка байтов EOF определенного типа) или выдать какой-либо заголовок от клиента. Пример.
const EOF = Buffer.alloc(6);
// Client Side
client.sendBytes(EOF); // on end
// Server
if(chunk.slice(-6).compare(EOF) === 0)
console.log('File EOF close write stream');
- Если вы читаете несколько изображений одновременно, вам нужно будет добавить идентификатор к каждому чанку для правильной записи на стороне сервера. Идентификатор должен всегда иметь одинаковую длину, чтобы вы могли правильно нарезать буфер на стороне сервера.
const imageOne = fs.createReadStream('./image-1.jpg');
const imageTwo = fs.createReadStream('./image-2.jpg');
// This will be mixed, and you'll end up with a broken image
imageOne.on('data', chunk => client.sendBytes(chunk)); // add identifier
imageTwo.on('data', chunk => client.sendBytes(chunk)); // add identifier
Ниже приведен пример использования пакета websocket .
Сервер
/* rest of implementation */
wsServer.on('request', function(request) {
const connection = request.accept(null, request.origin);
const JPEG_EOF = Buffer.from([0xFF, 0xD9]);
let stream = null;
connection.on('message', function(message) {
if (message.type === 'binary') {
if(!stream) {
stream = fs.createWriteStream(generateImageName())
// this could be any Writable Stream
// not necessarily a file stream.
// It can be an HTTP request for example.
}
// Check if it's the end
if(JPEG_EOF.compare(message.binaryData) === 0) {
console.log('done');
stream.end(message.binaryData);
stream = null;
return;
}
// You will need to implement a back pressure mechanism
stream.write(message.binaryData)
}
});
});
Клиент
/** ... **/
client.on('connect', function(connection) {
fs.createReadStream('./some-image.jpg')
.on('data', chunk => {
connection.sendBytes(chunk);
});
});
/** ... **/
В приведенном выше примере будут обрабатываться только jpeg
изображения, поскольку он напрямую проверяет последние 2 байта jpeg, вы можете реализовать логику для других типов файлов.
В этом примере я предполагаю, что вы будете передавать только 1 изображение за раз для каждого соединения, иначе оно будет перепутано.
Прямо сейчас вам необходимо реализовать механизм противодавления для .write
, что означает, что вы должны проверить возвращаемое значение и дождаться события drain
. Я представлю пример с пользовательским Readable stream
позже, когда у меня будет больше времени для правильной обработки противодавления
UPDATE
С помощью следующего фрагмента, поскольку реализован поток Readable
, мы можем использовать .pipe
, который будет обрабатывать противодавление.
const { Readable } = require('stream');
class ImageStream extends Readable {
constructor() {
super();
this.chunks = [];
this.EOF = Buffer.from([0xFF, 0xD9]);
}
add(chunk) {
this.chunks.push(chunk);
if(this.isPaused()) {
this.resume();
// Need to call _read if instead of this.push('') you return without calling .push
// this._read();
}
}
_read() {
const chunk = this.chunks.shift();
if(!chunk) { // nothing to push, pause the stream until more data is added
this.pause();
return this.push(''); // check: https://nodejs.org/api/stream.html#stream_readable_push
// If you return without pushing
// you need to call _read again after resume
}
this.push(chunk);
// If the last 2 bytes are not sent in the same chunk
// This won't work, you can implement some logic if that can happen
// It's a really edge case.
const last = chunk.slice(-2);
if(this.EOF.compare(last) == 0)
this.push(null); // Image done, end the stream.
}
}
/* ... */
wsServer.on('request', function(request) {
const connection = request.accept(null, request.origin);
let stream = null;
connection.on('message', function(message) {
if (message.type === 'binary') {
if(!stream) {
stream = new ImageStream();
stream.pipe(fs.createWriteStream(generateImageName()));
// stream.pipe(request(/* ... */));
stream.on('end', () => {
stream = null; // done
});
}
stream.add(message.binaryData);
}
});
connection.on('close', function(connection) {
// close user connection
});
});