У меня есть несколько Bull очередей в моем NodeJS
проекте, которые будут выполняться, если предыдущая очередь была выполнена успешно. Я пытаюсь проверить некоторые адреса электронной почты здесь.
Проверьте формат электронной почты (formatQueue)
Наличие электронной почты использование пакета npm email-existence
(creationQueue)
Процесс formatQueue
занимает меньше времени, при этом выполняется RegEx
и проверка формата Email
. но выполнение пакета email-existence
занимает около 5-10 секунд.
formatQueue
и existenceQueue
работают правильно, если меньше jobs
, например 20-100. но когда я добавляю более 1000 рабочих мест одновременно, existenceQueue
завершается с ошибкой ниже
myemail@email.com job stalled more than allowable limit
Я проверил проблему ЗДЕСЬ и ЗДЕСЬ , Я думал, что процесс занимает слишком много времени, чтобы ответить, поэтому добавил limiter
как указано ЗДЕСЬ . Но это не помогает мне.
Если задание в какой-либо очереди не удается, следующее задание не обрабатывается. Он остановится на этом, а остальные задания останутся в состоянии waiting
.
Мой код похож на приведенный ниже код. Пожалуйста, помогите мне с проблемой.
Очередь. js
var formatQueue = new Queue('format', "redis-db-url");
var existenceQueue = new Queue('existence', "redis-db-url");
// ------------ function for adding to queue ------------
module.exports.addToQueue = (emails) => {
emails.forEach(element => {
formatQueue.add(element, { attempts: 3, backoff: 1000 });
});
}
// ------------ Queue Process -------------
// Format Test Process
formatQueue.process(function(job, done){
FormatTest.validate(job.data, (err, data) => {
if(err) done();
else{
job.data = data;
done();
}
});
});
// Existence Test Process
formatQueue.process(function(job, done){
ExistenceTest.validate(job.data, (err, data) => {
if(err) done();
else{
job.data = data;
done();
}
});
});
// ------------ On Cmplete Handlers ------------
formatQueue.on('completed', function(job){
if(job.data.is_well_format){
existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
}else QueueModel.lastStep(job.data)
});
existenceQueue.on('completed', function(job){
QueueModel.lastStep(job.data)
});
// ------------ To update the emaile ------------
module.exports.lastStep = (data) => {
Emails.updateEmail(data, (err, updated) => {
if(!err) {
formatQueue.clean('completed');
existenceQueue.clean('completed');
}
})
}