Я пытаюсь преобразовать следующий ответ из rxjs 5,5 в 6 (код ниже тоже)
Цель сценария - запустить сеть (или TLS)сервер и использовать RXJS для прослушивания другого события.
Событием может быть:
- соединение
- secureConnection (для tls)
- data
- error
- connect
- end
- ... и несколько других
Я уточню вопрос, если я добьюсь прогресса, не стесняйтесь, если хотите.
На данный момент идея состоит в том, чтобы просто иметь базовый скрипт с минимальным значением connection
, secureConnection
, data
и end
.
const Rx = require('rxjs');
const net = require('net');
const uuid = require('uuid');
module.exports = () => {
const sockets = new Map();
const ids = new Map();
const GetSocket = _id => sockets.get(_id);
const GetId = _socket => ids.get(_socket);
const SetSocket = _socket =>{
_socket.setEncoding('utf8');
const _id = uuid();
sockets.set(_id, _socket);
ids.set(_socket,_id);
return _id;
};
const server = net.createServer({ allowHalfOpen: true });
const socketStream = Rx.Observable.fromEvent(server, 'connection');
const RemoveSocket = socket => {
const id = ids.get(socket);
sockets.delete(id);
ids.delete(socket)
console.log("[server.js] socket closed..");
};
const socketObservable = socket => SetSocket(socket) &&
Rx.Observable
.of({
action: 'CONNECTION',
socket: GetId(socket)
})
.merge(
Rx.Observable
.fromEvent(socket,'data')
.map(d=>{
try {return JSON.parse(d);}
catch (e) {
console.log(e);
return d;
}
})
.map(msg=>{
return Object.assign({action:msg,socket:GetId(socket)})
})
)
.takeUntil(Rx.Observable.fromEvent(socket, 'end')
.map(()=>RemoveSocket(socket)));
const Print = ()=>{
//ids.forEach(id=> console.log(GetSocket(id)));
//ids.clear();
console.log("total connected socket : " + ids.size);
};
const startServer = port => server.listen(port) &&
socketStream
.flatMap(socketObservable);
console.log("[server.js] Starts Started" );
return {startServer, Print , stop: () => server.close()};
};
Спасибо