Rx JS загружает и кеширует сначала N значений при уведомлении, а затем отправляет значения одно за другим при следующих уведомлениях - PullRequest
0 голосов
/ 07 февраля 2020

Я создаю приложение angular, в котором я пытаюсь реализовать как можно больше реактивным способом, используя Rx JS.

Я реализую что-то вроде карусели - где следующий элемент отображается на действия пользователя. Товары принимаются от BE по одному. Я пытаюсь сделать пользовательский опыт гладким. Итак, я хочу предварительно загрузить первые N элементов, прежде чем что-либо показывать пользователю.

После загрузки N элементов - мне нужно показать первый загруженный элемент. Когда пользователь нажимает «следующий» - я запускаю следующий предварительно загруженный элемент и запускаю предварительную загрузку следующего элемента, чтобы убедиться, что количество предварительно загруженных элементов всегда равно N.

Тип буфера - чтобы не заставлять пользователя ждать на каждом шаге. Итак, следующие элементы предварительно загружаются, пока пользователь все еще просматривает предыдущие.

Я думал использовать что-то вроде bufferCount(N) при загрузке элементов, а затем распределить массив с помощью map((ar) => from(ar)). И с другим уведомителем Subject, использующим zip для запуска выбросов из этого буфера ... Но, похоже, он работает не очень хорошо. Похоже, что при каждом выбросе N у меня возникает какой-то сбой, когда я сначала вижу один элемент, а затем быстро другой.

Не уверен, как реализовать это лучше. Похоже, что это должен быть общий случай использования.

---- Edit ----

Элементы загружаются через http, верно. Вот код, который у меня есть atm (не совсем функциональный - просто концепция):

//..........
// loadRndItem loads data using httpClient
this.loadingBuffer$ =  this.nextItemSubj.pipe(
      // here should somehow trigger first N loading processes
      flatMap(() => this.loadRndItem()),
      bufferCount(this.bufferSize),
      flatMap((ar) => {
        return from(ar);
      }),
      share(),
      takeUntil(this.endSubj)
    );

this.currentItem$ = zip(
    this.loadingBuffer$,
    this.nextItemSubj
  ).pipe(
    map(([val, _]) => val),
    share(),
    takeUntil(this.gameEndSubj)
  );

  //..........
function nextItem(): void {
  this.nextItemSubj.next();
}

Ответы [ 2 ]

1 голос
/ 07 февраля 2020

Я думаю, вы могли бы сделать это так:

const click$ = new Subject();
const loadMore$ = new Subject();

const buffer$ = concat(of(N), loadMore$)
  .pipe(
    // Unpack the array of results with `concatAll()` into separate emissions
    concatMap(n => forkJoin(makeNRequests(n)).pipe(concatAll())),
  );

zip(buffer$, click$.pipe(startWith(1)))
  .pipe(
     map((result, index) => {
       if (index > 0) { // We need `index` so that's why this is not inside `tap`.
         loadMore$.next(1);
       }
       return result[0];
     }),
   )
  .subscribe(console.log);

Демонстрация в реальном времени: https://stackblitz.com/edit/rxjs-kdh9uk?file=index.ts

0 голосов
/ 07 февраля 2020

Я думаю, что мне удалось получить то поведение, которое я хотел.

Может быть, немного хакерский ... Здесь есть еще одна подписка. Вот код:

nextItemSubj: Subject<void> = new Subject();

// loading CACHE_SIZE items on init. And load more on nextItemSubj emission
// spread the loaded array into separate emissions
this.itemsBuffer$ = concat(
    from(new Array(CACHE_SIZE).fill(null)),
    this.nextItemSubj
  ).pipe(
    flatMap(() => this.loadRndItem()),
    bufferCount(CACHE_SIZE),
    mergeMap((ar) => {
      return from(ar);
    }),
    share(),
    takeUntil(this.endSubj)
  );

// emit preloaded values one by one when nextItemSubj.next() is called
this.currentQuizData$ = zip(
    this.itemsBuffer$,
    this.nextItemSubj
  ).pipe(
    map(([val, _]) => val),
    shareReplay(),
    takeUntil(this.endSubj)
  );

// when itemBuffer$ first emits (after CACHE_SIZE items are preloaded)
// this will trigger first emission of currentItem$ to display
// and also trigger next item loading
this.itemsBuffer$.pipe(take(1)).subscribe(() => this.nextItemSubj.next());
...