конвертировать rxjs 5.5 в 6 скрипт - PullRequest
0 голосов
/ 18 июня 2019

Я пытаюсь преобразовать следующий ответ из rxjs 5,5 в 6 (код ниже тоже)

Цель сценария - запустить сеть (или TLS)сервер и использовать RXJS для прослушивания другого события.

Событием может быть:

  • соединение
  • secureConnection (для tls)
  • data
  • error
  • connect
  • end
  • ... и несколько других

Я уточню вопрос, если я добьюсь прогресса, не стесняйтесь, если хотите.

На данный момент идея состоит в том, чтобы просто иметь базовый скрипт с минимальным значением connection, secureConnection, data и end.


const Rx = require('rxjs');
const net = require('net');
const uuid = require('uuid');

module.exports  = () => {

    const sockets = new Map();
    const ids = new Map();

    const GetSocket = _id => sockets.get(_id);
    const GetId = _socket => ids.get(_socket);

    const SetSocket = _socket =>{
        _socket.setEncoding('utf8');
        const _id = uuid();
        sockets.set(_id, _socket);
        ids.set(_socket,_id);
        return _id;
    };

    const server = net.createServer({ allowHalfOpen: true });
    const socketStream = Rx.Observable.fromEvent(server, 'connection');

    const RemoveSocket = socket  => {

        const id = ids.get(socket);
        sockets.delete(id);
        ids.delete(socket)

        console.log("[server.js] socket closed..");
    };
    const socketObservable = socket => SetSocket(socket) &&
        Rx.Observable
            .of({
                action: 'CONNECTION',
                socket: GetId(socket)
            })
            .merge(
                Rx.Observable
                    .fromEvent(socket,'data')
                    .map(d=>{
                        try {return JSON.parse(d);}
                        catch (e) {
                            console.log(e);
                            return d;
                        }
                    })
                    .map(msg=>{
                        return Object.assign({action:msg,socket:GetId(socket)})
                    })
            )
            .takeUntil(Rx.Observable.fromEvent(socket, 'end')
                .map(()=>RemoveSocket(socket)));

    const Print = ()=>{
        //ids.forEach(id=> console.log(GetSocket(id)));
        //ids.clear();
        console.log("total connected socket : " + ids.size);
    };

    const startServer = port => server.listen(port) &&
        socketStream
            .flatMap(socketObservable);

    console.log("[server.js] Starts Started" );
    return {startServer, Print , stop: () => server.close()};
};

Спасибо

...