Angular Observable ожидание ответа сообщения websocket в течение тайм-аута - PullRequest
0 голосов
/ 26 февраля 2019

Я пишу службу websocket в моем приложении Angular.Теперь я хотел бы отправить сообщение на сервер и подождать некоторое время для ответа.Чтобы определить ответ, я работаю с идентификатором sync- и sync-response-ID.

    import { Injectable } from '@angular/core';
    import {webSocket, WebSocketSubject} from "rxjs/webSocket";
    import {of, from, Observable, timer } from "rxjs";
    import {share, filter, takeUntil} from "rxjs/operators";

    class WsMessage{
      Id: number;
      TimeStamp: Date;
      ResponseId: number;
      MessageData: string;
    }

    class MessageQueue{
      Message: WsMessage;
      TimeStamp: Date;
      SyncId: number;
    }

@Injectable({
  providedIn: 'root'
})
export class WsService{
  // Any message has a maximum lifetime of 10 mimnutes.
  private readonly MaxMessageQueueTime = 600;
  private clearQueueTimer: any;
  private messageQueue: Array<MessageQueue> = new Array<MessageQueue>(); 
  private socket: WebSocketSubject<WsMessage>;
  private idGenerator: number;

  constructor(){
        // Check every 30s for clearing old messages.
    this.clearQueueTimer = setInterval(() =>{
      if(this.messageQueue){
        let maxTime = new Date();
        maxTime = new Date(maxTime.setTime(maxTime.getTime() + 
(this.MaxMessageQueueTime * 1000)));
        this.messageQueue= this.messageQueue.filter(m => m.TimeStamp <= maxTime);
  }
}, 30000)
  }

  public Init(url: string){
      this.idGenerator = 0;
      if(this.socket!= null) {
        this.socket.unsubscribe();
        this.socket = null;
      }

      this.socket = webSocket<WsMessage>({
        url: url,
        openObserver: {
          next: value => {
            console.log(value);
          }
        }
      });
      this.socket.subscribe(msg => {
        if(msg != null && msg.Id > 0 && this.messageQueue.filter(i => i.SyncId == msg.Id).length == 0){
          let m = new MessageQueue();
          m.SyncId = msg.Id;
          m.Message = msg;
          m.TimeStamp = new Date();  
          this.messageQueue.push(m);            
        }
        console.log("msg received: " + msg);
      },
      error1 => {
        console.log("error received: " + error1);
      },
      () => {
        console.log("connection closed!");
      });      
    }
    public sendAndWait(message: WsMessage, timeout: number): Observable<WsMessage>{
    if(message== null){
      return null;
    }
    if(this.socket == null){
      return null;
    }
    let id = this.idGenerator++;
    message.Id = id;

    this.socket.next(message);
    let obs = from(this.messageQueue).pipe(share());
    let timeoutTimer = timer(5000);
    let result = obs.pipe(filter(m => m.ResponseId == id)).pipe(takeUntil(timeoutTimer)).pipe(map(m => m.Message)).pipe(share());
    result.subscribe(value => {
      this.messageQueue= this.messageQueue.filter(m => m.ResponseId != value.Id)
});
    return result;
  }    
}

Я не уверен, является ли данная процедура безопасной и не имеет ли утечки?Должен ли я быть осторожным с любой подпиской, которую я не отписал?

Спасибо!

Обновление:

  • добавил BehaviourSubject, чтобы любой мог подписаться на полученные сообщения
  • Добавлен ngDestroy для отмены подписки сокета
  • Создан пример стекаблица для лучшей читаемости

Пример стекаблица

...