У меня есть приложение, которое подключается к веб-сокету для получения данных. Подача данных только односторонняя, поэтому клиент не будет отправлять данные на сервер
Я создал WebSocketService, который подписывается на веб-сокет (используя stomp), а затем отправляет данные в наблюдаемое, когда приходит сообщение
private createSocketObservable<T>(topic: string) {
return Observable.create((obs: Observer<T>) => {
// Subscribe to the topic, and on each message the observer pushes the parsed data
const subscription = this.stompClient.subscribe(topic, (message: Message) => {
const jsonData = JSON.parse(message.body);
obs.next(jsonData);
});
this.subscriptions.push(subscription);
});
}
Затем я хочу иметь разные классы обслуживания, которые подписываются на разные темы в веб-сокете и передают данные, поступающие из вышеприведенного, наблюдаемым компоненту, возможно, изменяя или фильтруя эти данные в пути (например, используя карту). .
Я создал абстрактный класс WebSocketSubscription, который эти службы могут расширять и который содержит наблюдаемую информацию, возвращаемую из WebSocketService
export abstract class WebSocketSubscriber<T> {
abstract topic: string;
webSocketFeed: Observable<T>;
constructor(private webSocketService: WebSocketService) {
this.webSocketService.connect().then(() => {
this.webSocketFeed = this.webSocketService.getObservable(this.topic);
});
}
Как мне теперь настроить его так, чтобы любой класс обслуживания, который расширяет WebSocketSubscriber, мог вызывать такие функции, как map, Different, Filter и т. Д., И чтобы компонент мог подписаться на результат этого фида?
Кроме того, подписка на компонент, вероятно, произойдет до того, как будет подключен веб-сокет. Вначале я думал использовать обещание подождать, пока соединение не будет установлено, но мне было бы интересно узнать, существует ли способ RxJ для справиться с этим тоже