RxJS: динамически добавлять и удалять Observable с слиянием - PullRequest
1 голос
/ 09 июня 2019

Я пытаюсь построить подключаемый модуль ProgressService.

Служба должна отслеживать, сколько «вещей» загружается в данный момент. И есть метод isLoading(), который возвращает Observable<boolean>, чтобы указать, загружается ли что-либо вообще.

Мое первое решение было очень наивным с использованием new BehaviorSubject(0), а затем каждый провайдер загрузки просто звонил ProgressService.increase() и ProgressService.decrease(). Это работало нормально. Но теперь я хотел бы стать более реактивным, если это возможно.


Затем я наткнулся на merge, который прекрасно работает, когда все поставщики загрузки известны в начале:

this.progress = merge(...progressProviders).pipe(
  scan((acc, curr) => acc + (curr ? 1 : -1), 0)
);

Это просто увеличит / уменьшит значение progress, когда какой-либо провайдер загрузки выдаст true или false.

Но мне также нужна какая-то функция регистрации / отмены регистрации. В основном это должно добавить новую Observable в цепочку merge (или удалить ее).


Новый ProgressService должен выглядеть так:

class ProgressService {

  progress: Observable<number> = EMPTY; // value > 0 when something is loading

  addLoadingProvider(lp: Observable<boolean>) {
    // increment this.progress, when lp emits true
    // decrease this.progress, when lp emits false
    // nice to have: ignore if lp's first emitted value is false
  }

  removeLoadingProvider(lp: Observable<boolean>) {
    // stop listening to lp
    // clean up: decrease this.progress, if lp last emitted true
  }

  isLoading(): Observable<boolean> {
    return this.progress.pipe(
      map(val => val !== 0)
    );
  }

}

Может быть, метод removeLoadingProvider вообще не нужен (?), Если мы возвращаем Subscription из addLoadingProvider и используем Subscription.unsubscribe() для отмены регистрации.

Надеюсь, кто-нибудь подскажет, как merge и unmerge дополнительные Наблюдаемые по требованию.

1 Ответ

0 голосов
/ 09 июня 2019

Согласно вашему объяснению, я мог бы понять следующее [пожалуйста, поправьте меня, если мое понимание неверно] -

Вы хотите собрать различные наблюдаемые, которые излучают логическое значение, и вы хотите отслеживать такиеЕсли у любого из них есть хотя бы один "true", то ваша окончательная наблюдаемая должна выдавать значение true, в противном случае конечная наблюдаемая должна возвращать "false"

Хотя ваш подход BehaviorSubject является реактивным.Я предлагаю следующий подход;Дайте мне знать, если это имеет смысл в вашем сценарии -

Подход 1 -

enum ListAction {
  Added,
  Removed,
  Empty,
  Undefined
}

export class ProgressService {

  constructor() { }

  progress: Observable<number> = EMPTY; // value > 0 when something is loading

  obsListChanged: BehaviorSubject<ListAction> = new BehaviorSubject<any>(ListAction.Undefined);
  obsList: Array<Observable<boolean>> = [];

  addLoadingProvider(lp: Observable<boolean>) {
    // increment this.progress, when lp emits true
    // decrease this.progress, when lp emits false
    // nice to have: ignore if lp's first emitted value is false
    this.obsList.push(lp);

    this.obsListChanged.next(ListAction.Added);
  }

  removeLoadingProvider(lp: Observable<boolean>) {
    // stop listening to lp
    // clean up: decrease this.progress, if lp last emitted true
    this.obsList = this.obsList.filter(i => i !== lp);
    this.obsListChanged.next(ListAction.Removed);
  }

  isLoading(): Observable<boolean> {
    // return this.progress.pipe(
    //   map(val => val !== 0)
    // );
    return this.obsListChanged.pipe(
      switchMap(() => {
        return combineLatest(this.obsList);
      }),
      map(v => {
        return v.some(loading => loading);
      })
    );
  }
}

Я определил наблюдаемое действие ListAction, если вы хотите выполнять определенные работы согласнозатем ваш ListAction вы можете сделать то же самое в операторах rxjs согласно вашей логике.

Подход 2 [Немного улучшенная версия Подхода 1] -

export class ProgressService {

  constructor() { }

  progress: Observable<number> = EMPTY; // value > 0 when something is loading

  obsList$: BehaviorSubject<Array<Observable<boolean>>> = new BehaviorSubject<Array<Observable<boolean>>>([]);

  addLoadingProvider(lp: Observable<boolean>) {
    // increment this.progress, when lp emits true
    // decrease this.progress, when lp emits false
    // nice to have: ignore if lp's first emitted value is false
    this.obsList$.next([...this.obsList$.getValue(), lp]);
  }

  removeLoadingProvider(lp: Observable<boolean>) {
    // stop listening to lp
    // clean up: decrease this.progress, if lp last emitted true
    const removed = this.obsList$.getValue().filter(i => i !== lp);
    this.obsList$.next(removed);
  }

  isLoading(): Observable<boolean> {
    // return this.progress.pipe(
    //   map(val => val !== 0)
    // );
    return this.obsList$.pipe(
      switchMap(obs => {
        return combineLatest(obs);
      }),
      map(v => {
        return v.some(loading => loading);
      })
    );
  }
}
...