При запуске кода ниже он печатает со многими результатами.
я подозреваю, что событие completed
прослушивает все предыдущие задания, выполненные в текущем экземпляре очереди.
Как мне управлять завершенным событием, чтобы прослушивать только текущее задание?
producer.js
Производитель создает задание с числовым идентификатором по умолчанию и прослушивает глобальное завершение, чтобы вернуть ответ, когда задание выполнено.
const BullQ = require('bull');
let bullQ = BullQ('my-first-queue', {
redis: {
host: process.env.REDISHOST || 'localhost',
password: process.env.REDISPASSWORD || ''
}
});
app.get('/search/:term', async (req, res) => {
const job = await bullQ.add({
searchTerm: req.params.term
});
// Listen to the global completion of the queue in order to return result.
bullQ.on('global:completed', (jobId, result) => {
// Check if id is a number without any additions
if (/^\d+$/.test(jobId) && !res.headersSent) {
console.log(`Producer get: Job ${jobId} completed! Result: ${result}`);
res.json(`Job is completed with result: ${result}`);
}
});
});
consumer.js
Потребитель имеет 2 роли.
1. Чтобы потреблять рабочие места, как это должно быть по книге
2. Для создания новых рабочих мест на основе результата последнего задания.
const BullQ = require('bull');
let bullQ = BullQ('my-first-queue', {
redis: {
host: process.env.REDISHOST || 'localhost',
password: process.env.REDISPASSWORD || ''
}
});
bullQ.process((job, done) => {
// Simulate asynchronous server request.
setTimeout(async () => {
// Done the first job and return an answer to the producer after the timeout.
done(null, `Search result for ${job.data.searchTerm}`);
// next job run
if (counter < 10) {
// For the first run the id is just a number if not changed via the jobId in JobOpts,
// the next time the job id will be set to {{id}}_next_{{counter}} we need only the first number in order not to end with a long and not clear concatenated string.
let jobID = (/^\d+$/.test(job.id)) ? job.id : job.id.replace(/[^\d].*/,'');
await createNextJob(jobID, ++counter);
}
}, 100);
});
// Create next job and add it to the queue.
// Listen to the completed jobs (locally)
const createNextJob = async (id, counter) => {
const nextJob = bullQ.add({
searchTerm: "Next job"
}, {
jobId: `${id}_next_${counter}`
});
await bullQ.on('completed', (job, result) => {
job.finished();
console.log(`Consumer(next): Job ${job.id} completed! Result: ${result}`);
});
};