Две очереди, одна с параллелизмом 1 другая с параллелизмом 3 - PullRequest
0 голосов
/ 19 октября 2018

У меня такая ситуация:

const q1 = async.queue((task,cb) => task(cb), 1);
const q2 = async.queue((task,cb) => task(cb), 3);

В основном, ситуация такова: если я могу получить блокировку на q1, я могу обрабатывать до 3 вещей одновременно в q2.Ака, большинство вещей в моей программе должны работать последовательно, но, конечно, все может работать параллельно, но только если они сначала получат «блокировку» на q1.Тем не менее, написание этого оказывается гораздо сложнее, чем ожидалось.Я думаю, что это похоже на проблему блокировки чтения / записи.https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock

Другими словами, я могу обрабатывать вещи только в q2, если у меня есть блокировка на q1.Одна из проблем с реализацией этого заключается в том, что если мне всегда придется получать блокировку на q1 в критической секции q2, то я не смогу использовать более высокий уровень параллелизма в q2, он всегда будет последовательным.

Единственная идея, которая у меня есть сейчас, это перевернуть логическое значение, что-то вроде этого:

let q2HasLock = false;

if(q2hasLock === true){
  q2.push(cb => ...);
}
else{
  q1.push(cb => ...);
}

, тогда я могу использовать это:

q2.drain = () => {
   q2HasLock = false;
};

, но остальная часть - это яне знаю, как / когда перевернуть q2HasLock на true.Надеюсь, вы понимаете проблему / идею.Другая проблема, связанная с этой реализацией, заключается в том, что запросы на передачу в q2 могут привести к истощению запросов, идущих в q1, поэтому я мог бы сделать это:

let q2HasLock = false, count = 0 ;

if(q2hasLock === true && count < 3){
  count++;
  q2.push(cb => ...);
}
else{
  q1.push(cb => ...);
}

q2.drain = () => {
   count = 0;
   q2HasLock = false;
};

Это становится проблематично - я хотел бы реализовать это какнасколько возможно!

1 Ответ

0 голосов
/ 21 октября 2018

Хорошо, так что это должно удовлетворить требования, но было бы неплохо найти более простой способ, чем этот:

    const async = require('async');
    const q1 = async.queue((task, cb) => task(cb), 1);
    const q2 = async.queue((task, cb) => task(cb), 3);

    const map = new Map();
    let count = 0;

    const getProm = function (q) {

      if (!map.has(q)) {
        map.set(q, new Promise(resolve => {

          q1.push(cb => {
            resolve();  // 'lock' on q1 has been obtained, nothing can use q1 until we release the lock by calling cb().
            q.drain = () => {
              q.drain = null;
              count = 0;
              map.delete(q);
              cb();
            };
          });

        }));
      }

      return map.get(q);

    };


 if(foo && count < 5){
   return getProm(q2).then(v => {

      q2.push(cb => {
        setTimeout(cb, 140);
      });

    });
 }

 return q1.push(cb => {
      setTimeout(cb, 30);
    });

что это делает - одновременно может выполняться только один тип задач, ноесли тип задачи foo, то 3 может выполняться одновременно.Проверка count < 5 гарантирует, что запросы q1 не будут истощены запросами q2.

Хитрость заключается в использовании обещаний.Как только обещание разрешено, вы все равно можете вызвать promise.then() со значением разрешения, хотя в этом случае нам не нужно это значение.

Для чуть более высокой производительности мы могли бы использовать логические флаги вместо обещаний,но логика здесь намного сложнее.

...