Я пишу службу websocket в моем приложении Angular.Теперь я хотел бы отправить сообщение на сервер и подождать некоторое время для ответа.Чтобы определить ответ, я работаю с идентификатором sync- и sync-response-ID.
import { Injectable } from '@angular/core';
import {webSocket, WebSocketSubject} from "rxjs/webSocket";
import {of, from, Observable, timer } from "rxjs";
import {share, filter, takeUntil} from "rxjs/operators";
class WsMessage{
Id: number;
TimeStamp: Date;
ResponseId: number;
MessageData: string;
}
class MessageQueue{
Message: WsMessage;
TimeStamp: Date;
SyncId: number;
}
@Injectable({
providedIn: 'root'
})
export class WsService{
// Any message has a maximum lifetime of 10 mimnutes.
private readonly MaxMessageQueueTime = 600;
private clearQueueTimer: any;
private messageQueue: Array<MessageQueue> = new Array<MessageQueue>();
private socket: WebSocketSubject<WsMessage>;
private idGenerator: number;
constructor(){
// Check every 30s for clearing old messages.
this.clearQueueTimer = setInterval(() =>{
if(this.messageQueue){
let maxTime = new Date();
maxTime = new Date(maxTime.setTime(maxTime.getTime() +
(this.MaxMessageQueueTime * 1000)));
this.messageQueue= this.messageQueue.filter(m => m.TimeStamp <= maxTime);
}
}, 30000)
}
public Init(url: string){
this.idGenerator = 0;
if(this.socket!= null) {
this.socket.unsubscribe();
this.socket = null;
}
this.socket = webSocket<WsMessage>({
url: url,
openObserver: {
next: value => {
console.log(value);
}
}
});
this.socket.subscribe(msg => {
if(msg != null && msg.Id > 0 && this.messageQueue.filter(i => i.SyncId == msg.Id).length == 0){
let m = new MessageQueue();
m.SyncId = msg.Id;
m.Message = msg;
m.TimeStamp = new Date();
this.messageQueue.push(m);
}
console.log("msg received: " + msg);
},
error1 => {
console.log("error received: " + error1);
},
() => {
console.log("connection closed!");
});
}
public sendAndWait(message: WsMessage, timeout: number): Observable<WsMessage>{
if(message== null){
return null;
}
if(this.socket == null){
return null;
}
let id = this.idGenerator++;
message.Id = id;
this.socket.next(message);
let obs = from(this.messageQueue).pipe(share());
let timeoutTimer = timer(5000);
let result = obs.pipe(filter(m => m.ResponseId == id)).pipe(takeUntil(timeoutTimer)).pipe(map(m => m.Message)).pipe(share());
result.subscribe(value => {
this.messageQueue= this.messageQueue.filter(m => m.ResponseId != value.Id)
});
return result;
}
}
Я не уверен, является ли данная процедура безопасной и не имеет ли утечки?Должен ли я быть осторожным с любой подпиской, которую я не отписал?
Спасибо!
Обновление:
- добавил BehaviourSubject, чтобы любой мог подписаться на полученные сообщения
- Добавлен ngDestroy для отмены подписки сокета
- Создан пример стекаблица для лучшей читаемости
Пример стекаблица