Bull queue: как остановить обработку оставшихся заданий при сбое задания? - PullRequest
0 голосов
/ 07 мая 2020

Я использую очередь bull для обработки некоторых заданий. В текущем сценарии каждое задание - это своего рода операция. Поэтому всякий раз, когда операция (задание) из списка операций в очереди терпит неудачу, очередь должна прекратить обработку оставшихся заданий (операций).

Что я пробовал до сих пор?

Итак, я пытался приостановить очередь при сбое определенного задания. Затем очередь возобновляется, когда она истощается. Теперь, когда он возобновляется, очередь не начинается с неудачного задания, а переходит к следующему заданию.

var Queue = require('bull');

let redisOptions = {
  redis: { port: 6379, host: '127.0.0.1' },
  limiter: { max: 1, duration: 1000 }
}
var myQueue = new Queue('Linear-Queue', redisOptions);

myQueue.process('Type-1', function (job, done) {
  setTimeout(() => {
    done(job.data.error);
  }, job.data.time);
});

let options = {
  attempts: 3,
  removeOnComplete: false, // removes job from queue on success
  removeOnFail: false // removes job from queue on failure
}

setTimeout(() => {
  myQueue.add('Type-1', { time: 10000, description: "Type-1 One", error: false }, options);
}, 1 * 1000);

setTimeout(() => {
  myQueue.add('Type-1', { time: 5000, description: "Type-1 two", error: true }, options);
}, 2 * 1000);

setTimeout(() => {
  myQueue.add('Type-1', { time: 3000, description: "Type-1 three", error: false }, options);
}, 3 * 1000);


myQueue.on('completed', function (job, result) {
  console.log("Completed: " + job.data.description);
});

myQueue.on('failed', async function (job, error) {
  console.log("Failed: " + job.data.description);
  try {
    await myQueue.pause();
  } catch (error) {
    console.log(error);
  }
});

myQueue.on('drained', async function () {
  try {
    await myQueue.resume();
  } catch (error) {
    console.log(error);
  }
});

Текущий вывод:

Current result

Ожидаемый результат: если Type-1 two успешно завершается с 3-й попытки.

Completed: Type-1 One
Failed: Type-1 two
Failed: Type-1 two
Completed: Type-1 two
Completed: Type-1 three

Ожидаемый результат: если Type-1 two также не удалось выполнить 3-ю попытку.

Completed: Type-1 One
Failed: Type-1 two
Failed: Type-1 two
Failed: Type-1 two

Все, что мне нужно, это то, что очередь должна останавливать обработку новых заданий до тех пор, пока текущее задание не будет выполнено без сбоев. В случае сбоя, сбойное задание должно выполняться x раз. При попытке x+1 он должен очистить (удалить все задания) очередь. Итак, как добиться этого линейного поведения в очереди.

1 Ответ

0 голосов
/ 11 мая 2020
var Queue = require('bull');

let redisOptions = {
  redis: {
    port: 6379,
    host: '127.0.0.1'
  },
  // Maximum one job is processed every 5 seconds.
  limiter: {
    max: 1,
    duration: 5000
  },
  settings: {
    backoffStrategies: {
      // Custom backoff strategy.
      myStrategy: function (attemptsMade, error) {
        return error.delay || 60 * 1000;
      }
    }
  }
};

var myQueue = new Queue('Linear-Queue', redisOptions);

myQueue.process('Type-1', function (job, done) {
  setTimeout(() => {
    // compare attemptsMade with 3, to test 'on-all-attempts-fail' scenario.
    if (job.attemptsMade == 2) {
      done(false);
    } else {
      done(job.data.error);
    }
  }, job.data.time);
});

let options = {
  attempts: 3,
  backoff: {
    type: 'myStrategy'
  },
  removeOnComplete: false, // Set to true if job has to be removed on success.
  removeOnFail: false // Set to true if job has to be removed on failure.
}

for (let i = 1; i <= 10; i++) {
  let error = false;

  if (i == 2) {
    error = true
  }

  myQueue.add('Type-1', { time: i * 1000, description: "Type-1 Job-" + i, error: error }, options);

  // You can also add job with some time gap.
  // setTimeout(i => {
  //   myQueue.add('Type-1', { time: i * 1000, description: "Type-1 Job-" + i, error: error }, options);
  // }, 1000, i);
}

myQueue.on('completed', async function (job, result) {
  console.log("Completed: Job " + job.id);
});

myQueue.on('failed', async function (job, error) {
  console.log("Failed: Job " + job.id + " on attempt " + job.attemptsMade);
  handelFailure(job);
});

myQueue.on('error', function (error) {
  console.log("Queue: on error");
  console.log(error);
});

async function handelFailure(currentJob) {
  if (currentJob.opts.attempts == currentJob.attemptsMade) {
    // Remove all jobs in queue and clan the redis.
    await myQueue.clean(70 * 1000, 'wait');
    await myQueue.clean(70 * 1000, 'active');
    await myQueue.clean(70 * 1000, 'failed');
    await myQueue.clean(70 * 1000, 'paused');
    await myQueue.clean(70 * 1000, 'delayed');
    await myQueue.empty();
    return;
  }

  let pendingJobs = await myQueue.getJobs(['waiting', 'active', 'failed', 'paused', 'delayed'], 0, -1, true);
  console.log("Failing all remaining " + pendingJobs.length + " jobs...");
  for (let i = 0; i < pendingJobs.length; i++) {
    let currentJobId = parseInt(currentJob.id);
    let pendingJobId = parseInt(pendingJobs[i].id);

    if (pendingJobId <= currentJobId) {
      continue;
    }

    let errorInfo = {
      delay: (70 * 1000) + (i * 5 * 1000),
      message: "Moving " + pendingJobId + " to failed queue."
    }

    await pendingJobs[i].moveToFailed(errorInfo, true);
  }
}

Выход 1: Когда задание 2 успешно завершилось с 3-й попытки.

enter image description here

Выход 2: Когда задание 2 завершилось неудачно со всеми тремя попыток (очередь перестает обрабатывать оставшиеся задания, как ожидалось).

enter image description here

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...