Проблема при переносе сокета в Observables - MaxListenersExceededWarning: обнаружена возможная утечка памяти в EventEmitter - PullRequest
0 голосов
/ 08 июня 2018

У меня есть сервер socket.io , где я определил простой класс с именем SocketObs, чтобы обернуть сокет в Observable.

Код следующий

class SocketObs {
    constructor(private socket: any) {}
    onEvent(event): Observable<any> {
        return new Observable<any>((observer: Observer<any>) => {
            this.socket.on(event, data => observer.next(data));
        });
    }
}

Сам сервер реализован классом MyServer, чей код:

export class MyServer {
    public static readonly PORT = 8081;

    private app: express.Application;
    private server: Server;
    private io: socketIo.Server;
    private port: string | number;

    private observables = new Array<any>();

    constructor() {
        this.app = express();
        this.port = process.env.PORT || MobileObjectServer.PORT;
        this.server = createServer(this.app);
        this.io = socketIo(this.server);
        this.listen();

        for(let i = 0; i < 20; i++) {
            this.observables.push(from([1, 2, 3]))
        }
    }

    private listen() {
        this.server.listen(this.port, () => {
            console.log('Running server STRESS on port %s', this.port);
        });

        this.io.on('connect', socket => {
            console.log('Connected client on port %s.', this.port);
            const socketObs = new SocketObs(socket);
            socketObs.onEvent('monitor')
            .subscribe(
                () => this.monitoSubscribeToObservables(socketObs)
            )
        });
    }

    private monitoSubscribeToObservables(socket: SocketObs) {
        this.observables.forEach(obs => {
            obs
            .pipe(takeUntil(socket.onEvent('disconnect')))
            .subscribe(null, null, () => console.log('observable completed'));
        })
    }

    public getApp(): express.Application {
        return this.app;
    }

}

Логика очень проста

  • при создании сервера.заполняет массив 20 наблюдаемыми объектами
  • , которые он устанавливает в режим прослушивания и ждет, пока не появится сообщение «монитор»
  • при получении сообщения «монитор» он подписывается на объекты наблюдения, созданные во время запуска
  • Наблюдаемые подписки завершаются, когда сокет отключен (да, я знаю, что это не совсем так, но мне нужно иметь оператор takeUntil для моего случая)

Если я запускаю сервер в узле, я выполняю логику, но затем получаю следующее предупреждение

(node:76408) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 disconnect listeners added. Use emitter.setMaxListeners() to increase limit

Если я делаю то же самое, но без использования, то есть с этой реализацией

 private listen() {
        this.server.listen(this.port, () => {
            console.log('Running server STRESS on port %s', this.port);
        });

        this.io.on('connect', socket => {
            console.log('Connected client on port %s.', this.port);

            socket.on('monitor', () => {
                this.monitoSubscribeToObservables(socket);
            });

        });
    }

    private monitoSubscribeToObservables(socket: socketIo.Socket) {
        const monitorDisconnected = new Subject<any>();
        this.observables.forEach(obs => {
            obs
            .pipe(takeUntil(monitorDisconnected))
            .subscribe(null, null, () => console.log('observable completed'));
        })
        socket.on('disconnect', () => {
            console.log('Monitor disconnected');
            monitorDisconnected.next();
        });
    }

йПредупреждение больше не выдается.

Я изо всех сил пытаюсь понять причину такого различного поведения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...