[РЕДАКТИРОВАНИЕ 11/5] После тестирования с некоторыми из предложенных решений я заметил, что некоторые из моих импортных rxjs были неправильными (я импортировал из 'rxjs / internal' iso из 'rxjs'). Так что, если вы столкнетесь с такой ошибкой, вы можете взглянуть на нее.
На Stackoverflow уже существует пара вопросов, связанных с отпиской от наблюдаемой, хотя ни один из них не очень полезен для моей конкретной проблемы.
В моем приложении ("чат") я могу искать пользователей, вводя их имя в поле поиска. После получения пользователей я хочу показать, подключены ли эти пользователи к сети или нет.
Вот мой подход.
Существует объект поведения для захвата поискового запроса:
private term$ = new BehaviorSubject(null);
Всякий раз, когда в термине $ observable появляется новое значение, я начинаю искать новых пользователей:
users$: Observable<User[]> = this.term$.pipe(
filter(term => !!term),
switchMap(term => this.usersService.searchUsers(term))
);
Но я также хочу периодически проверять, находятся ли эти пользователи «в сети», поэтому я изменяюсь над Observable следующим образом (обратите внимание на последнюю строку):
users$: Observable<any[]> = this.term$.pipe(
filter(term => !!term),
switchMap(term => this.usersService.searchUsers(term)),
switchMap(users => zip(...users.map(user => this.checkLoggedInF(user))))
);
Для всех пользователей я создаю наблюдаемый интервал. checkLoggedInF - это функция, которая каждые 5 секунд проверяет на сервере, подключен ли данный пользователь:
checkLoggedInF = user => {
return interval(5000).pipe(
switchMap(() => this.usersService.isLoggedIn(user.loginnaam).pipe(
map(loggedIn => ({...user, loggedIn}))
))
);
}
Теперь проблема состоит в том, что всякий раз, когда появляется новый поисковый термин (для термина $ observable), наблюдаемые "интервала checkLoggedIn" должны быть отписаны. Я пытался использовать оператор takeUntil как в наблюдаемом «интервале checkLoggedIn», так и в родительском «users $», но безрезультатно. Также использование оператора takeWhile было неэффективным.