У меня есть сервер socket.io , где я определил простой класс с именем SocketObs
, чтобы обернуть сокет в Observable.
Код следующий
class SocketObs {
constructor(private socket: any) {}
onEvent(event): Observable<any> {
return new Observable<any>((observer: Observer<any>) => {
this.socket.on(event, data => observer.next(data));
});
}
}
Сам сервер реализован классом MyServer
, чей код:
export class MyServer {
public static readonly PORT = 8081;
private app: express.Application;
private server: Server;
private io: socketIo.Server;
private port: string | number;
private observables = new Array<any>();
constructor() {
this.app = express();
this.port = process.env.PORT || MobileObjectServer.PORT;
this.server = createServer(this.app);
this.io = socketIo(this.server);
this.listen();
for(let i = 0; i < 20; i++) {
this.observables.push(from([1, 2, 3]))
}
}
private listen() {
this.server.listen(this.port, () => {
console.log('Running server STRESS on port %s', this.port);
});
this.io.on('connect', socket => {
console.log('Connected client on port %s.', this.port);
const socketObs = new SocketObs(socket);
socketObs.onEvent('monitor')
.subscribe(
() => this.monitoSubscribeToObservables(socketObs)
)
});
}
private monitoSubscribeToObservables(socket: SocketObs) {
this.observables.forEach(obs => {
obs
.pipe(takeUntil(socket.onEvent('disconnect')))
.subscribe(null, null, () => console.log('observable completed'));
})
}
public getApp(): express.Application {
return this.app;
}
}
Логика очень проста
- при создании сервера.заполняет массив 20 наблюдаемыми объектами
- , которые он устанавливает в режим прослушивания и ждет, пока не появится сообщение «монитор»
- при получении сообщения «монитор» он подписывается на объекты наблюдения, созданные во время запуска
- Наблюдаемые подписки завершаются, когда сокет отключен (да, я знаю, что это не совсем так, но мне нужно иметь оператор
takeUntil
для моего случая)
Если я запускаю сервер в узле, я выполняю логику, но затем получаю следующее предупреждение
(node:76408) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 disconnect listeners added. Use emitter.setMaxListeners() to increase limit
Если я делаю то же самое, но без использования, то есть с этой реализацией
private listen() {
this.server.listen(this.port, () => {
console.log('Running server STRESS on port %s', this.port);
});
this.io.on('connect', socket => {
console.log('Connected client on port %s.', this.port);
socket.on('monitor', () => {
this.monitoSubscribeToObservables(socket);
});
});
}
private monitoSubscribeToObservables(socket: socketIo.Socket) {
const monitorDisconnected = new Subject<any>();
this.observables.forEach(obs => {
obs
.pipe(takeUntil(monitorDisconnected))
.subscribe(null, null, () => console.log('observable completed'));
})
socket.on('disconnect', () => {
console.log('Monitor disconnected');
monitorDisconnected.next();
});
}
йПредупреждение больше не выдается.
Я изо всех сил пытаюсь понять причину такого различного поведения.