Очередь http звонков в rxjs - PullRequest
       9

Очередь http звонков в rxjs

0 голосов
/ 26 октября 2018

Я работаю над сервисом сеанса, который проверяет, не истек ли токен авторизации.Если это так, он выполняет вызов для обновления токена.Во время этого запроса все входящие запросы должны быть поставлены в очередь и отправлены после завершения запроса.После этого все входящие запросы могут проходить без очереди, пока токен снова не истечет.Я рисую мраморную диаграмму для этого:

1. ---a---b---c--d-----e--
2. -t-------f------t------
3. ---a---b---------cd-e--

Я назвал 1. как incoming$ Наблюдаемый, 2. is valve$ - если это правда, запросы могут проходить, если это ложь, они должныбыть в очереди.Когда это становится правдой, очереди увольняются.

Что я сделал до сих пор?Я думаю, что это должно быть сделано путем добавления промежуточной наблюдаемой, называемой receiver$, которая меняет свое значение в зависимости от valve$.Когда valve$ истинно, он просто возвращает простой предмет, если ложно, он возвращает тот, который способен записывать значения.

receiver$ = valve.pipe(
  map((value) => {
    if (value) {
      return new Subject();
    } else {
      return (new Subject()).pipe(
        shareReplay(),
      );
    }
  })
);

И затем каждое новое значение, полученное в incoming$, должно быть добавлено ктекущий наблюдаемый в recevier$:

incoming$.pipe(
  combineLatest(receiver$),
).subscribe((incomingValue, recevier) => {
  recevier.next(incomingValue);
});

И вот часть, которую я не могу обернуть вокруг.Всякий раз, когда клапан становится истинным, мне нужны последние два значения из receiver$.Второй последний будет содержать очередь, а последний будет содержать активный субъект.Объединив их, я смог достичь своей цели.Я не знаю, как реализовать это и как будут управляться подписки.Кроме того, это выглядит слишком сложным для такого, казалось бы, простого варианта использования.

Каков наилучший способ реализации этого поведения?

Ответы [ 2 ]

0 голосов
/ 26 октября 2018

Вы можете сделать это, просто используя concatMap, который объединяет два разных потока на основе формы значения valve$.Обратите внимание, что для этого необходимо, чтобы и valve$, и incoming$ использовались совместно с share().

valve$
  .pipe(
    concatMap(v => v
      ? incoming$.pipe(takeUntil(valve$))
      : incoming$
        .pipe(
          takeUntil(valve$),
          bufferCount(Number.POSITIVE_INFINITY),
          mergeAll(),
        )
    ),
  )
  .subscribe(console.log)

Демонстрационная версия: https://stackblitz.com/edit/rxjs6-demo-d3bsxb?file=index.ts

0 голосов
/ 26 октября 2018

Вы можете рассмотреть решение в этом направлении.

Сначала вы создаете тему, через которую вы отправляете все запросы, которые хотите сделать

const requests$ = new Subject<Observable<any>>()

Затем вы создаете тему, через которую вы сообщаете о состоянии клапана , т. Е. Можете ли вы выполнить запрос немедленно или нужно его буферизовать

const valve$ = new Subject<boolean>();

Теперь вы можете создать поток, который передает запросы, только если клапан открыт, т.е. если последнее значение, выданное valve$, равно true

const openStream$ = valve$.pipe(
  switchMap(valve => {
    if (valve) {
      return requests$;
    } else {
      return empty();
    }
  })
);

Вы также можете создать поток, который буферизует все запросы, когда клапан закрыт

const bufferedStream$ = requests$.pipe(
  bufferToggle(valve$.pipe(filter(valve => !valve)), () => valve$.pipe(filter(valve => valve))),
  mergeMap(bufferedCalls => bufferedCalls)
)

Теперь все, что вам нужно сделать, это merge openStream$ и bufferedStream$ и subscribe к полученному потоку, как это

merge(openStream$, bufferedStream$).pipe(
  mergeMap(request => request)
)
.subscribe(httpCallResult => {// do stuff})

Я протестировал это решение со следующими данными, имитируя реальные вызовы http с помощью Observables of string

const requests$ = new Subject<Observable<string>>();
setTimeout(() => {requests$.next(of('A'))}, 50);
setTimeout(() => {requests$.next(of('B'))}, 60);
setTimeout(() => {requests$.next(of('C'))}, 100);
setTimeout(() => {requests$.next(of('D'))}, 110);
setTimeout(() => {requests$.next(of('E'))}, 130);
setTimeout(() => {requests$.next(of('F'))}, 250);
setTimeout(() => {requests$.next(of('G'))}, 260);
setTimeout(() => {requests$.next(of('H'))}, 300);
setTimeout(() => {requests$.next(of('I'))}, 310);
setTimeout(() => {requests$.next(of('L'))}, 330);


const valve$ = new Subject<boolean>();
setTimeout(() => {valve$.next(true)}, 30);
setTimeout(() => {valve$.next(false)}, 80);
setTimeout(() => {valve$.next(true)}, 120);
setTimeout(() => {valve$.next(false)}, 200);
setTimeout(() => {valve$.next(true)}, 290);
...