Подача данных из веб-сокета через наблюдаемые в классах обслуживания компоненты - PullRequest
0 голосов
/ 04 сентября 2018

У меня есть приложение, которое подключается к веб-сокету для получения данных. Подача данных только односторонняя, поэтому клиент не будет отправлять данные на сервер

Я создал WebSocketService, который подписывается на веб-сокет (используя stomp), а затем отправляет данные в наблюдаемое, когда приходит сообщение

private createSocketObservable<T>(topic: string) {
    return Observable.create((obs: Observer<T>) => {
        // Subscribe to the topic, and on each message the observer pushes the parsed data
        const subscription = this.stompClient.subscribe(topic, (message: Message) => {
            const jsonData = JSON.parse(message.body);
            obs.next(jsonData);
        });
        this.subscriptions.push(subscription);

    });
}

Затем я хочу иметь разные классы обслуживания, которые подписываются на разные темы в веб-сокете и передают данные, поступающие из вышеприведенного, наблюдаемым компоненту, возможно, изменяя или фильтруя эти данные в пути (например, используя карту). .

Я создал абстрактный класс WebSocketSubscription, который эти службы могут расширять и который содержит наблюдаемую информацию, возвращаемую из WebSocketService

export abstract class WebSocketSubscriber<T> {
    abstract topic: string;

    webSocketFeed: Observable<T>;

    constructor(private webSocketService: WebSocketService) {
        this.webSocketService.connect().then(() => {
        this.webSocketFeed = this.webSocketService.getObservable(this.topic);
    });
}

Как мне теперь настроить его так, чтобы любой класс обслуживания, который расширяет WebSocketSubscriber, мог вызывать такие функции, как map, Different, Filter и т. Д., И чтобы компонент мог подписаться на результат этого фида?

Кроме того, подписка на компонент, вероятно, произойдет до того, как будет подключен веб-сокет. Вначале я думал использовать обещание подождать, пока соединение не будет установлено, но мне было бы интересно узнать, существует ли способ RxJ для справиться с этим тоже

1 Ответ

0 голосов
/ 04 сентября 2018

Используя ReplaySubject, вы можете запомнить предыдущие сообщения тем для сервисов, которые подписываются позже:

@Injectable()
export class WebSocketServiceService {

  private socketSubjects: { [key: string]: ReplaySubject<any> } = {}

  private createSocketObservable<T>(topic: string) {
    if(this.socketSubjects[topic]){
      return this.socketSubjects[topic].asObservable();
    }
    else{
      this.socketSubjects[topic] = new ReplaySubject();
      const subscription = this.stompClient.subscribe(topic, (message: Message) => {
      const jsonData = JSON.parse(message.body);
      this.socketSubjects[topic].next(jsonData);
    });
    this.subscriptions.push(subscription);
    }
  }
}

При создании ReplaySubject вы можете установить предел памяти:

new ReplaySubject(5) // remember last 5 items

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...