Как узнать, когда выборка закончится, не блокируя основной поток? - PullRequest
1 голос
/ 27 мая 2020

Я делаю запросы к API, но их сервер разрешает только определенное количество активных подключений, поэтому я хотел бы ограничить количество текущих выборок. Для моих целей выборка завершается (а не продолжается) только тогда, когда тело ответа HTTP поступает к клиенту.

Я хотел бы создать такую ​​абстракцию:

const fetchLimiter = new FetchLimiter(maxConnections);
fetchLimiter.fetch(url, options); // Returns the same thing as fetch()

Это будет сделать вещи намного проще, но, похоже, нет способа узнать, когда поток, используемый другим кодом, заканчивается, потому что потоки блокируются, пока они читаются. Можно использовать ReadableStream.tee(), чтобы разделить поток на два, использовать один и вернуть другой вызывающему (возможно, также создав с ним Response), но это снизит производительность, верно?

1 Ответ

1 голос
/ 27 мая 2020

Поскольку fetch использует обещания, вы можете воспользоваться этим, чтобы создать простую систему очередей.

Это метод, который я использовал раньше для постановки в очередь вещей, основанных на обещаниях. Он ставит элементы в очередь, создавая Promise, а затем добавляя его преобразователь в массив. Конечно, пока это обещание не разрешится, await не будет вызывать любые последующие обещания.

И все, что нам нужно сделать, чтобы начать следующую выборку, когда она завершится, - это просто захватить следующий преобразователь и вызвать его. Обещание выполняется, и затем начинается fetch!

Лучшая часть, так как мы фактически не потребляем результат fetch, не нужно беспокоиться о необходимости clone или чего-то еще ... мы просто передать его неповрежденным, чтобы вы могли использовать его позже then или что-то в этом роде.

* Изменить: , так как тело все еще передается после разрешения обещания выборки, я добавил третий вариант, чтобы вы могли передать тип тела, а FetchLimiter извлекал и анализировал тело за вас.

Все они возвращают обещание, которое в конечном итоге разрешается с фактическим содержимым.

Таким образом, вы можете просто заставить FetchLimiter проанализировать тело за вас. Я сделал так, чтобы он возвращал массив [response, data], чтобы вы все еще могли проверять такие вещи, как код ответа, заголовки и т. Д. c.

В этом отношении вы даже можете передать обратный вызов или что-то в этом роде await, если вам нужно сделать что-то более сложное, например, различные методы синтаксического анализа тела в зависимости от кода ответа.

Пример

Я добавил комментарии, чтобы указать, где начинается и заканчивается код FetchLimiter ... остальное - просто демонстрационный код.

Он использует подделку fetch с использованием setTimeout, разрешение которого составляет 0,5–1,5 секунды. Он немедленно запустит первые три запроса, а затем будут заполнены активные объекты и будет ждать разрешения одного из них.

Когда это произойдет, вы увидите комментарий о том, что обещание было разрешено, а затем запускается следующее обещание в очереди, а затем вы увидите then from в разрешении for l oop. Я добавил это then, чтобы вы могли видеть порядок событий.

(function() {
  const fetch = (resource, init) => new Promise((resolve, reject) => {
    console.log('starting ' + resource);
    setTimeout(() => {
      console.log(' - resolving ' + resource);
      resolve(resource);
    }, 500 + 1000 * Math.random());
  });

  function FetchLimiter() {
    this.queue = [];
    this.active = 0;
    this.maxActive = 3;
    this.fetchFn = fetch;
  }
  FetchLimiter.prototype.fetch = async function(resource, init, respType) {
    // if at max active, enqueue the next request by adding a promise
    // ahead of it, and putting the resolver in the "queue" array.
    if (this.active >= this.maxActive) {
      await new Promise(resolve => {
        this.queue.push(resolve); // push, adds to end of array
      });
    }
    this.active++; // increment active once we're about to start the fetch
    const resp = await this.fetchFn(resource, init);
    let data;
    if (['arrayBuffer', 'blob', 'json', 'text', 'formData'].indexOf(respType) >= 0)
      data = await resp[respType]();
    this.active--; // decrement active once fetch is done
    this.checkQueue(); // time to start the next fetch from queue
    return [resp, data]; // return value from fetch
  };

  FetchLimiter.prototype.checkQueue = function() {
    if (this.active < this.maxActive && this.queue.length) {
      // shfit, pulls from start of array. This gives first in, first out.
      const next = this.queue.shift();
      next('resolved'); // resolve promise, value doesn't matter
    }
  }

  const limiter = new FetchLimiter();
  for (let i = 0; i < 9; i++) {
    limiter.fetch('/mypage/' + i)
      .then(x => console.log(' - .then ' + x));
  }
})();

Предостережения:

  • Я не на 100% уверен, что тело все еще транслируется, когда обещание разрешается ... похоже, это вас беспокоит. Однако, если это проблема, вы можете использовать один из методов примеси тела, например blob или text или json, который не разрешается, пока содержимое тела не будет полностью проанализировано ( см. Здесь )

  • Я намеренно сделал его очень коротким (например, 15 строк реального кода) в качестве очень простого доказательства концепции. Вы бы хотели добавить некоторую обработку ошибок в производственный код, чтобы, если fetch отклоняется из-за ошибки подключения или чего-то еще, вы все равно уменьшаете активный счетчик и запускаете следующий fetch.

  • Конечно, он также использует синтаксис async/await, потому что его намного легче читать. Если вам нужна поддержка старых браузеров, вы захотите переписать с помощью Promises или транспилировать с помощью babel или аналогичного.

...