Используя rx js 6, как я могу создать отменяемый конвейер веб-запросов с кешем и который ограничивает количество запросов, происходящих одновременно - PullRequest
0 голосов
/ 15 апреля 2020

Я надеюсь, что кто-то сможет указать мне правильное направление, потому что я борюсь с сочетанием параллелизма и способностью отменить поставленных в очередь запросов в rx js. Я собираюсь попытаться объяснить это в последовательных событиях. Скажем, у нас есть наблюдаемая А, которая получает массив строк.

События: А наблюдает: ['dog', 'cat', 'elephant', 'tiger'] В нисходящем направлении проверяется, кэшируется ли сетевой ответ строки если он существует в кеше, он получает его из кеша, если нет, то запрашивает его из Интернета и сохраняет наблюдаемое в кеш с помощью publishReplay / shareReplay. Существует ограничение на 2 сетевых запроса одновременно, поэтому он пытается извлечь «собака» и «кошку» из API (эта операция занимает более 2000 мс). Через 1000 мс A наблюдает другой набор значений: ['dog', 'rat', 'horse', 'rabbit'].

Далее должно произойти следующее. Я не хочу, чтобы запросы "собака" и "кошка" были отменены, я хочу, чтобы они закончили свой запрос, но слова "слон" и "тигр" с первого запроса я хочу игнорировать. Как только «собака» и «кошка» получают ответ «крыса» и «лошадь» во втором кадре, должны быть запрошены из сети, и, наконец, когда запрашивается любое из этих разрешений «кролик».

Здесь мой текущий код. Я попытался переключиться между отсрочкой и для для networkObservable, и поведение отличается, и ни то, ни другое я не хочу.


const cache = new Map();

// Fake Promise to fake a api request
function later(delay, value) {
  console.log('requesting', value);
  return new Promise(resolve => setTimeout(resolve, delay, value));
}


const testObservable = of(['dog', 'rat', 'horse', 'rabbit']).pipe(
  delay(1000),
  startWith(['dog', 'cat', 'elephant', 'tiger'])
);

testObservable.pipe(
  map(array => from(array).pipe(
    publish(arrayObservable => {
      const cachedObservable = arrayObservable.pipe(
        filter(id => cache.has(id)),
        flatMap(id => cache.get(id), 1)
      );
      const uncachedObservable = arrayObservable.pipe(
        filter(id => !cache.has(id)),
        flatMap(id => {
          const networkObservable = from(later(2000, id)).pipe(
            tap(e => console.log('response', e)),
            map(e => 'parsed: ' + e),
            tap(e => console.log('parsed', e)),
            publishReplay(1),
            refCount(),
            take(1)
          );
          cache.set(id, networkObservable);
          return networkObservable;
        }, 2)
      );
      return merge(cachedObservable, uncachedObservable);
    })
  )),
  switchAll()
)

Это приводит к выводу:

requesting dog
requesting cat
requesting rat
requesting horse
response dog
parsed parsed: dog
response rat
parsed parsed: rat
requesting rabbit
response horse
parsed parsed: horse
response rabbit
parsed parsed: rabbit

Какой близко к желаемому поведению, но с одним явным недостатком. Требуется крыса и лошадь, и она не ждет, пока собака и кошка решат, прежде чем будут казнены. Тем не менее, «тигр» и «слон» были правильно расположены, поэтому функциональность работает.

Нужно ли мне создавать отдельную тему, которая обрабатывает запросы?

1 Ответ

0 голосов
/ 16 апреля 2020

Я попытался решить эту интересную проблему, по крайней мере, из того, что я понял.

Начальная точка - testObservable, которая является потоком Arrays<string>. Каждый string из этих массивов представляет собой потенциальный запрос к серверной службе. В каждый момент времени может быть не более двух запросов, поэтому должен быть установлен какой-то механизм очереди. Для этого я использую concurrency параметр mergeMap.

. Ключевым моментом здесь является то, что всякий раз, когда testObservable генерирует новый массив, любой запрос, связанный с string s, содержится в массивах. испущенное ранее, которое еще не было отправлено удаленной службе, должно быть остановлено.

Поэтому я начинаю создавать поток объектов, содержащий string, который является входом для вызова удаленной службы, а также индикатор остановки как это

testObservable
  .pipe(
    mergeMap((a) => {
      i++;
      if (arrays[i - 1]) {
        arrays[i - 1].stop = true;
      }
      const thisArray = { stop: false, i };
      arrays[i] = thisArray;
      return from(a.map((_v) => ({ v: _v, ssTop: arrays[i] }))).pipe(
        mergeMap((d) => {
          // d is an object containing the parameter for the remote call and an object of type {stop: boolean, i: number}
          // for every string of every array a d is emitted
        }, 2)
      );
    })
  )

Затем для каждого отправленного d я могу реализовать logi c, который гарантирует, что вызов к удаленной службе выполняется, только если для флага stop не установлен true вот так

d.ssTop.stop
            ? NEVER
            : from(later(2000, d.v))

Обратите внимание, что флаг stop для массива i устанавливается равным true каждый раз, когда массив i+1 th выдается testObservable, гарантируя, что нет вызовы, относящиеся к i -ому массиву, делаются после того, как i+1 -й массив был отправлен.

Это может выглядеть так, как выглядит полный код

const cache = new Map();

// Fake Promise to fake a api request
function later(delay, value) {
  console.log("requesting", value);
  return new Promise((resolve) => setTimeout(resolve, delay, value));
}

const testObservable = of(["dog", "rat", "horse", "rabbit"]).pipe(
  delay(1000),
  startWith(["dog", "cat", "elephant", "tiger"])
);

let i = 0;
let arrays: { stop: boolean; i: number }[] = [];

testObservable
  .pipe(
    mergeMap((a) => {
      i++;
      if (arrays[i - 1]) {
        arrays[i - 1].stop = true;
      }
      const thisArray = { stop: false, i };
      arrays[i] = thisArray;
      return from(a.map((_v) => ({ v: _v, ssTop: arrays[i] }))).pipe(
        mergeMap((d) => {
          return d.ssTop.stop
            ? NEVER
            : cache[d.v]
            ? of(`${d.v} is the returned from cache}`)
            : from(later(2000, d.v)).pipe(
                map((v: any) => {
                  cache[v] = v;
                  return `${v} is the returned value ${d.ssTop.i}`;
                })
              );
        }, 2)
      );
    })
  )
  .subscribe({
    next: (d) => {
      console.log(d);
    },
  });
...