Создание очереди обещаний с вложенными обещаниями - PullRequest
0 голосов
/ 06 июня 2019

Я реализую механизм запросов, который массово выбирает и обрабатывает запросы. Я использую async / await.

Прямо сейчас поток выполнения выполняется в иерархии, где есть список элементов, содержащих запросы, и каждый из этих запросов имеет выборку.

Я пытаюсь связать элементы в группы по n, поэтому даже если у каждого из них m запросов с выборками внутри, одновременно выполняется только n запросов; и особенно только один запрос будет сделан одновременно к одному домену.

Проблема в том, что, когда я ожидаю выполнения элементов (на внешнем уровне, через некоторое время, который группирует элементы и будет останавливать итерации до разрешения обещаний), эти обещания разрешаются, когда выполнение внутреннего запроса откладывается из-за внутреннего ожидания по доставке.

Это приводит к тому, что моя очередь только останавливается на мгновение, вместо того, чтобы ждать внутреннего обещания разрешиться.

Это внешний класс очереди:

class AsyncItemQueue {
  constructor(items, concurrency) {
    this.items = items;
    this.concurrency = concurrency;
  }

  run = async () => {
    let itemPromises = [];

    const bundles = Math.ceil(this.items.length / this.concurrency);
    let currentBundle = 0;

    while (currentBundle < bundles) {
      console.log(`<--------- FETCHING ITEM BUNDLE ${currentBundle} OF ${bundles} --------->`);

      const lowerRange = currentBundle * this.concurrency;
      const upperRange = (currentBundle + 1) * this.concurrency;

      itemPromises.push(
        this.items.slice(lowerRange, upperRange).map(item => item.run())
      );

      await Promise.all(itemPromises);

      currentBundle++;
    }
  };
}


export default AsyncItemQueue;

Это простой класс элементов, который работает в очереди. Я опускаю лишний код.

class Item {

// ...

  run = async () => {
    console.log('Item RUN', this, this.name);

    return await Promise.all(this.queries.map(query => {
      const itemPromise = query.run(this.name);
      return itemPromise;

    }));
  }
}

И это запросы, содержащиеся внутри предметов. Каждый элемент имеет список запросов. Опять же, некоторый код удален, так как он не интересен.

class Query {

// ...


  run = async (item) => {
    // Step 1: If requisites, await.
    if (this.requires) {
      await this.savedData[this.requires];
    }

    // Step 2: Resolve URL.
    this.resolveUrl(item);

    // Step 3: If provides, create promise in savedData.
    const fetchPromise = this.fetch();

    if (this.saveData) {
      this.saveData.forEach(sd => (this.savedData[sd] = fetchPromise));
    }


    // Step 4: Fetch.
    const document = await fetchPromise;

    // ...
  }
}

Время в AsyncItemQueue правильно останавливается, но только до тех пор, пока поток выполнения не достигнет шага 3 в Query. Как только он достигает той выборки, которая является оберткой для стандартных функций выборки, внешнее обещание разрешается, и я получаю все запросы, выполняемые одновременно.

Я подозреваю, что проблема где-то в классе Query, но я озадачен тем, как избежать разрешения внешнего обещания.

Я попытался сделать функцию Query class run, возвращающую документ, на всякий случай, но безрезультатно.

Любая идея или руководство будет с благодарностью. Я постараюсь ответить на любые вопросы о коде или предоставить больше при необходимости.

Спасибо!

PS: Вот код и окно с рабочим примером: https://codesandbox.io/s/goofy-tesla-iwzem

Как вы можете видеть на выходе из консоли, цикл while повторяется до завершения выборки, и все они выполняются одновременно.

1 Ответ

1 голос
/ 06 июня 2019

Я решил это.

Проблема была в классе AsyncItemQueue.В частности:

itemPromises.push(
  this.items.slice(lowerRange, upperRange).map(item => item.run())
);

Это подтолкнуло список обещаний в список, и так далее:

await Promise.all(itemPromises);

Не нашел никаких обещаний ждать в этом списке (потому что этосодержал больше списков, с обещаниями внутри).

Решением было изменить код на:

await Promise.all(this.items.slice(lowerRange, upperRange).map(item => item.run()));

Теперь он работает отлично.Элементы запускаются партиями из n, и новая партия не будет запущена, пока не закончится предыдущая.

Я не уверен, что это поможет кому-либо, кроме меня, но я оставлю это здесь на случай, если кто-нибудьнаходит подобную проблему когда-нибудь.Спасибо за помощь.

...