Попытка использовать Angular и RxJ для работы с веб-сокетами SocketIO. Первоначально это работает, однако, когда я перемещаюсь по приложению, а компонент загружается и выгружается, создается несколько подписок для темы сообщения SocketIO. Это приводит к многократной обработке данных на компоненте.
Моим первоначальным решением было отписаться от Subject в событии ngOnDestroy, однако это полностью разрывает Subject и больше не работает, когда я возвращаюсь на страницу с компонентом, когда создается новая подписка.
websocket.service
@Injectable()
export class WebsocketService {
private socket;
constructor() { }
public connect(): Rx.Subject<MessageEvent> {
this.socket = io(WS_URL)
let observable = new Observable(observer => {
this.socket.on('message', (data) => {
observer.next(data)
})
return () => {
this.socket.disconnect();
}
})
let observer = {
next: (data: Object) => {
this.socket.emit('message', JSON.stringify(data));
}
}
return Rx.Subject.create(observer, observable)
}
}
chat.service
@Injectable()
export class ChatService {
public messages: Subject<any>;
public shared: Observable<any>;
constructor(
private ws: WebsocketService
) {
this.messages = <Subject<any>>ws
.connect()
.map((response : any) => {
console.log("Recv " + response.type)
return response
})
this.shared = this.messages.share();
}
sendMessage(msg) {
this.messages.next(msg)
}
}
dashboard.component
export class DashboardComponent implements OnInit, OnDestroy {
subscription_obs: any;
constructor(
private chat: ChatService
) { }
ngOnInit() {
this.subscription_obs = this.chat.shared.subscribe(msg => {
console.log("Dashboard subscription fire!")
})
}
ngOnDestroy() {
this.subscription_obs.unsubscribe()
}
}