NodeJS Bull Остановить очередь заданий на сбое задания - PullRequest
0 голосов
/ 02 мая 2020

У меня есть несколько Bull очередей в моем NodeJS проекте, которые будут выполняться, если предыдущая очередь была выполнена успешно. Я пытаюсь проверить некоторые адреса электронной почты здесь.

  1. Проверьте формат электронной почты (formatQueue)

  2. Наличие электронной почты использование пакета npm email-existence (creationQueue)

Процесс formatQueue занимает меньше времени, при этом выполняется RegEx и проверка формата Email. но выполнение пакета email-existence занимает около 5-10 секунд.

formatQueue и existenceQueue работают правильно, если меньше jobs, например 20-100. но когда я добавляю более 1000 рабочих мест одновременно, existenceQueue завершается с ошибкой ниже

myemail@email.com job stalled more than allowable limit

Я проверил проблему ЗДЕСЬ и ЗДЕСЬ , Я думал, что процесс занимает слишком много времени, чтобы ответить, поэтому добавил limiter как указано ЗДЕСЬ . Но это не помогает мне.

Если задание в какой-либо очереди не удается, следующее задание не обрабатывается. Он остановится на этом, а остальные задания останутся в состоянии waiting.

Мой код похож на приведенный ниже код. Пожалуйста, помогите мне с проблемой.

Очередь. js

var formatQueue = new Queue('format', "redis-db-url");
var existenceQueue = new Queue('existence', "redis-db-url");

// ------------ function for adding to queue ------------
module.exports.addToQueue = (emails) => {
    emails.forEach(element => {
        formatQueue.add(element, { attempts: 3, backoff: 1000 });
    });
}

// ------------ Queue Process -------------

// Format Test Process
formatQueue.process(function(job, done){
    FormatTest.validate(job.data, (err, data) => {
        if(err) done();
        else{
            job.data = data;
            done();
        }
    });
});

// Existence Test Process
formatQueue.process(function(job, done){
    ExistenceTest.validate(job.data, (err, data) => {
        if(err) done();
        else{
            job.data = data;
            done();
        }
    });
});


// ------------ On Cmplete Handlers ------------
formatQueue.on('completed', function(job){
    if(job.data.is_well_format){
        existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
    }else QueueModel.lastStep(job.data)
});

existenceQueue.on('completed', function(job){
    QueueModel.lastStep(job.data)
});


// ------------ To update the emaile ------------
module.exports.lastStep = (data) => {
    Emails.updateEmail(data, (err, updated) => {
        if(!err) {
            formatQueue.clean('completed');
            existenceQueue.clean('completed');
        }
    })
}

1 Ответ

1 голос
/ 06 мая 2020

Попробуйте повторяющиеся задания

var formatQueue = new Queue('format', "redis-db-url");
var existenceQueue = new Queue('existence', "redis-db-url");

// ------------ function for adding to queue ------------
module.exports.addToQueue = (emails) => {
  emails.forEach(element => {
    let jobOptions = {
      repeat: {
        every: 10 * 1000, // Run job every 10 seconds for example
        limit: 3 // Maximum number of times a job can repeat.
      },
      jobId: someUniqueId, // important do not forget this
      removeOnComplete: true, // removes job from queue on success (if required)
      removeOnFail: true // removes job from queue on failure (if required)
    }

    formatQueue.add(element, jobOptions);
  });
}

// ------------ Queue Process -------------

// Format Test Process
formatQueue.process(function (job, done) {
  FormatTest.validate(job.data, (err, data) => {
    if (err) {
      // Done with error
      done(true);
    } else {
      job.data = data;
      // Done without any error
      done(false);
    }
  });
});

// Existence Test Process
existenceQueue.process(function (job, done) {
  ExistenceTest.validate(job.data, (err, data) => {
    if (err) {
      // Done with error
      done(true);
    } else {
      job.data = data;
      // Done without any error
      done(false);
    }
  });
});


// ------------ On Complete Handlers ------------
formatQueue.on('completed', function (job) {
  if (job.data.is_well_format) {
    let jobOptions = {
      repeat: {
        every: 10 * 1000, // Run job every 10 seconds for example
        limit: 3 // Maximum number of times a job can repeat.
      },
      jobId: someUniqueId, // important do not forget this
      removeOnComplete: true, // removes job from queue on success (if required)
      removeOnFail: true // removes job from queue on failure (if required)
    }

    existenceQueue.add(job.data, jobOptions);
  } else QueueModel.lastStep(job.data)
});

existenceQueue.on('completed', function (job) {
  QueueModel.lastStep(job.data)
});


// ------------ To update the email ------------
module.exports.lastStep = (data) => {
  Emails.updateEmail(data, (err, updated) => {
    if (!err) {
      formatQueue.clean('completed');
      existenceQueue.clean('completed');
    }
  })
}

...