Аналог async.queue для функций Async / Await - PullRequest
0 голосов
/ 29 апреля 2018

Я модернизирую некоторый код. Он имеет кусок для загрузки базы данных, реализованный как:

var customerQueue = async.queue(insertCustomer, DATABASE_PARALLELISM);
customerQueue.drain = function() {
    logger.info('all customers loaded');
    airportCodeMappingQueue.push(airportCodeMappings);
}

Функция insertCustomer используется для написания с обратными вызовами. Я изменил его на async / await, как часть модернизации кода.

Теперь, подумайте, что я написал эквивалент async.queue как:

let customerQueueElements = [];
var customerQueue = {};
customerQueue.push = (customers) => {
  customers.forEach(customer => {
    customerQueueElements.push(insertCustomer(customer))
  });
}

const processQueue = async (queue, parallelism) => {
  for (let i = 0; i < queue.length; i += parallelism) {
    for (let j = 0; j < parallelism; j++) {
      let q = []
      if (queue[i + j]) {
        q.push(queue[i + j])
      }
      await Promise.all(q)
    }
  }
}

Теперь я могу сделать await ProcessQueue(customerQueue, DATABASE_PARALLELISM), но синтаксис плохой, и я сохраняю видимую именованную переменную для каждой очереди.

Что было бы хорошим способом справиться с этим?

Кроме того, drain() должен быть подключен к then, верно?

1 Ответ

0 голосов
/ 29 апреля 2018

@ Берги правильно, насколько направление. Я собрал рабочую версию:

module.exports = function () {
  module.internalQueue = []
  module.func = undefined
  module.parallelism =  1

  const process = async () => {
    for (let i = 0; i < module.internalQueue.length; i += module.parallelism) {
      for (let j = 0; j < module.parallelism; j++) {
        let q = []
        if (module.internalQueue[i + j]) {
          q.push(module.func(module.internalQueue[i + j]))
        }
        await Promise.all(q)
      }
    }
    module.internalQueue = []
    module.drain()
  }

  module.queue = (func, parallelism = 1) => {
    module.func = func
    module.parallelism = parallelism
    return module
  }

  module.push = async (customers) => {
    module.internalQueue.push(customers)
    await process()
  }

  module.drain = () => {}

  return module
}

Это не совсем правильно, пока. Подписи аналогичны пакету async, но моя версия разделяет очередь между всеми экземплярами.

Необходимо найти простой способ создания экземпляра для каждого function с отдельным, например, «локальная» очередь. Тогда он в основном будет работать как оригинальный.

Будет обновляться по мере продвижения.

...