Угловой RxJS: уменьшение никогда не завершается - PullRequest
1 голос
/ 07 мая 2019

Я закодировал компонент двумя кнопками (searchButton, lazyButton). ngOnDestroy это:

public ngOnDestroy() {
    this.unsubscribe$.next();
    this.unsubscribe$.complete();
}

Я создал две наблюдаемые из события нажатия двух кнопок:

this.eager$ = Rx.Observable.fromEvent(this.searchButton, 'click')
    .takeUntil(this.$unsubscribe);

this.lazy$ = Rx.Observable.fromEvent(this.lazyButton, 'click')
    .takeUntil(this.$unsubscribe);

Таким образом, когда компонент уничтожается, обе наблюдаемые автоматически отписываются.

С другой стороны, каждый раз, когда нажимается кнопка, мне нужно сделать http-запрос:

Observable
    .merge(this.eager$, this.lazy$)
    .switchMap(() => this.service.getItemsFromStorage())

Итак, теперь я получаю Observable<Response>. Поэтому мне нужно разобраться с ответом:

Observable
    .merge(this.eager$, this.lazy$)
    .switchMap(() => this.service.getItemsFromStorage())
    .map(response => response.json())                        (****)
    .map(page => <Array<AdministrationUser>>page.included)   (****)

Итак, после этого я получаю Observable<Array<AdministrationUser>>.

Прежде всего, мне нужно применить некоторые преобразования к каждому AdministrationUser. Поэтому мне нужно создать Observable<AdministrationUser>:

Observable
    .merge(this.eager$, this.lazy$)
    .switchMap(() => this.service.getItemsFromStorage())
    .map(response => response.json())
    .map(page => <Array<AdministrationUser>>page.included)
    .switchMap(page => Observable.from(page.users))  (****)

Итак, теперь я могу изменить каждого пользователя:

Observable
    .merge(this.eager$, this.lazy$)
    .switchMap(() => this.service.getItemsFromStorage())
    .map(response => response.json())
    .map(page => <Array<AdministrationUser>>page.included)
    .switchMap(page => Observable.from(page.users))
    .do(user => /*modify user*/)  (****)

Так что после этого мне нужно только собрать всех полученных пользователей снова:

Observable
    .merge(this.eager$, this.lazy$)
    .switchMap(() => this.service.getItemsFromStorage())
    .map(response => response.json())
    .map(page => <Array<AdministrationUser>>page.included)
    .switchMap(page => Observable.from(page.users))
    .do(user => /*modify user*/)
    .reduce((acc, value) => [...acc, value], [])   (****)
    .do(users => this.rowsToShow = [...users])    (&&$$&&)
    .takeUntil(this.unsubscribe$)
    .subscribe();

После того, как все пользователи снова собраны, их необходимо назначить полю rowsToShow.

Проблема в том, что код на (&&$$&&) никогда не достигается.

Я полагаю, что из-за поведения reduce ждет ожидаемых результатов. Но я не знаю, почему это никогда не заканчивается, когда я закодировал, что, как только HTTP-ответ получен, наблюдаемое становится .switchMap(page => Observable.from(page.users)) Таким образом, наблюдаемые переключаются на наблюдаемые элементы массива, которые автоматически заканчиваются, когда массив пересекается.

Есть идеи?

Ответы [ 2 ]

0 голосов
/ 07 мая 2019

Причина в том, что сокращение ожидает окончательного выброса от наблюдаемого родителя, прежде чем выбросить окончательное уменьшенное значение.Поскольку ваши родительские наблюдаемые, которые вы объединяете, являются событиями щелчка, таким образом, они продолжают излучать значения, пока не будет достигнут порог.

.switchMap => Это ничего не делает, кроме разрешенияНаблюдаемый он возвращает и предоставляет значение.Использование switchMap не означает конец эмиссии.Каждый раз, когда будет возвращено новое значение, switchMap будет уменьшать Observable.from (page.users) и возвращать значение.

Вместо того, чтобы использовать приведенное ниже, просто отписать Observables при уничтожении компонента.Вы также можете изменить наблюдаемое, как показано ниже:

.reduce((acc, value) => {this.rowsToShow = [...acc, value];
    return this.rowsToShow }, [])   (****)
    //.do(users => this.rowsToShow = [...users])    (&&$$&&)
    .takeUntil(this.unsubscribe$)
    .subscribe();
0 голосов
/ 07 мая 2019

Ваша наблюдаемая источник - .merge(this.eager$, this.lazy$), которая является горячей наблюдаемой и никогда не завершается, поэтому ваш поток не будет завершен, попробуйте добавить first() после слияния, которое будет принимать только одно излучение из источника и завершает его

В качестве альтернативы вы можете поставить уменьшение во внутреннюю наблюдаемую

.switchMap(page => Observable.from(page.users).reduce(....))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...