В rxjs как отписаться от "внутренних" "интервальных" наблюдаемых с помощью takeUntil или takeWhile - PullRequest
0 голосов
/ 02 ноября 2018

[РЕДАКТИРОВАНИЕ 11/5] После тестирования с некоторыми из предложенных решений я заметил, что некоторые из моих импортных rxjs были неправильными (я импортировал из 'rxjs / internal' iso из 'rxjs'). Так что, если вы столкнетесь с такой ошибкой, вы можете взглянуть на нее.

На Stackoverflow уже существует пара вопросов, связанных с отпиской от наблюдаемой, хотя ни один из них не очень полезен для моей конкретной проблемы.

В моем приложении ("чат") я могу искать пользователей, вводя их имя в поле поиска. После получения пользователей я хочу показать, подключены ли эти пользователи к сети или нет.

Вот мой подход.

Существует объект поведения для захвата поискового запроса:

private term$ = new BehaviorSubject(null);

Всякий раз, когда в термине $ observable появляется новое значение, я начинаю искать новых пользователей:

users$: Observable<User[]> = this.term$.pipe(
    filter(term => !!term),
    switchMap(term => this.usersService.searchUsers(term))
);

Но я также хочу периодически проверять, находятся ли эти пользователи «в сети», поэтому я изменяюсь над Observable следующим образом (обратите внимание на последнюю строку):

users$: Observable<any[]> = this.term$.pipe(
    filter(term => !!term),
    switchMap(term => this.usersService.searchUsers(term)),
    switchMap(users => zip(...users.map(user => this.checkLoggedInF(user))))
);

Для всех пользователей я создаю наблюдаемый интервал. checkLoggedInF - это функция, которая каждые 5 секунд проверяет на сервере, подключен ли данный пользователь:

checkLoggedInF = user => {
    return interval(5000).pipe(
      switchMap(() => this.usersService.isLoggedIn(user.loginnaam).pipe(
        map(loggedIn => ({...user, loggedIn}))
      ))
    );
}

Теперь проблема состоит в том, что всякий раз, когда появляется новый поисковый термин (для термина $ observable), наблюдаемые "интервала checkLoggedIn" должны быть отписаны. Я пытался использовать оператор takeUntil как в наблюдаемом «интервале checkLoggedIn», так и в родительском «users $», но безрезультатно. Также использование оператора takeWhile было неэффективным.

Ответы [ 2 ]

0 голосов
/ 05 ноября 2018

В конце концов, это была глупая ошибка с моей стороны, некоторые из моих «импортов rxjs» были неправильными (я импортировал из rxjs / internal). Когда я исправил импорт, switchMap функционировал должным образом (закрывая подписку при изменении «верхнего потока»).

Короче говоря: не импортировать из 'rxjs / internal'

0 голосов
/ 02 ноября 2018

попробуйте использовать .tap() до switchMap, когда вы звоните checkLoggedInF, и сохраните observable из checkLoggedInF в свойстве

checkLoggedInF = user => {
    this.checkLoggedInF$ = interval(5000).pipe(
      switchMap(() => this.usersService.isLoggedIn(user.loginnaam).pipe(
        map(loggedIn => ({...user, loggedIn}))
      ))
    );
    return this.checkLoggedInF$;
}

users$: Observable<any[]> = this.term$.pipe(
    filter(term => !!term),
    switchMap(term => this.usersService.searchUsers(term)),
    tap(() => { this.checkLoggedInF$ && this.checkLoggedInF$.unsubscribe() })
    switchMap(users => zip(...users.map(user => this.checkLoggedInF(user))))
);

или другой вариант, который вы можете проверить, и unsubscribe() в вашем checkLoggedInF тоже

checkLoggedInF = user => {
   this.checkLoggedInF$ && this.checkLoggedInF$.unsubscribe() 
   this.checkLoggedInF$ = interval(5000).pipe(
     switchMap(() => this.usersService.isLoggedIn(user.loginnaam).pipe(
       map(loggedIn => ({...user, loggedIn}))
     ))
   );
   return this.checkLoggedInF$;
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...