Rx JS крючок отмены подписки - PullRequest
2 голосов
/ 27 мая 2020

Имея фрагмент кода, который извлекает некоторые данные в кеш и синхронизирует их. к указанным данным в какой-то момент времени?

Я хотел бы подключиться к вызову unsubscribe() для наблюдаемого (и проверить, равно ли количество подписчиков 0), но я не уверен, как этого добиться .

Реальный сценарий - это запрос graphql (apollo- angular) и подписка на веб-сокет для синхронизации данных c. Когда нет подписчиков на данные (например, пользователь перешел на другую страницу), соединение с веб-сокетом может быть закрыто для экономии ресурсов сервера, но данные должны быть удалены из кеша одновременно, иначе это может go рассинхронизируется c, так как от сервера больше не поступают события pu sh на случай, если пользователь снова вернется. Я не хочу хранить данные в кеше, который устарел.

Итак ... Как я могу подключиться к событию, когда наблюдаемый объект уничтожается / все подписчики отказались от подписки на удаление данных из кеша как хорошо? Служба настраивает запрос graphql и настраивает подписку для прослушивания изменений, возвращая Observable, но я не уверен, как я могу разорвать эту вещь, поскольку экземпляр был передан в коде пользователя.

Я не ищу только сеть, обход кеша и аналогичные настройки клиента apollo, я ищу способ фактически использовать, но сохраняйте данные в кеше apollo чистыми и актуальными, поскольку сервер предоставляет способ сделать это.

1 Ответ

1 голос
/ 27 мая 2020

Если вы используете Rx JS, вы можете использовать WebSocketSubject, который имеет несколько отличных опций.

Например, WebSocketSubject может быть задан объект конфигурации, где один полезный вариант - closeObserver: NextObserver<T>, чей метод next вызывается, когда соединение сокета становится закрытым.

Соединение автоматически закрывается, когда WebSocketSubject не имеет активных подписчиков .


Вот код для приведенной выше идеи:

const connectionClosed = new Subjet();
const unsubscribe = new Subject();

const webSocket = new WebSocketSubject({
  url: '',
  /* ... */,
  closeObserver: connectionClosed
});

connectionClosed.subscribe(() => {
  // Teardown logic
  // e.g clear cache 
});


// Registering subscribers
webSocket.pipe(takeUntil(unsubscribe)).subscribe(...);
webSocket.pipe(takeUntil(unsubscribe)).subscribe(...);
webSocket.pipe(takeUntil(unsubscribe)).subscribe(...);

// In `ngOnDestroy`, called maybe when the user navigates to another route
unsubscribe.next();
unsubscribe.complete();
...