Как прослушать завершенное событие в очереди быков - только для текущей работы - PullRequest
0 голосов
/ 01 апреля 2019

При запуске кода ниже он печатает со многими результатами. я подозреваю, что событие completed прослушивает все предыдущие задания, выполненные в текущем экземпляре очереди.

Как мне управлять завершенным событием, чтобы прослушивать только текущее задание?

producer.js

Производитель создает задание с числовым идентификатором по умолчанию и прослушивает глобальное завершение, чтобы вернуть ответ, когда задание выполнено.

const BullQ = require('bull');

let bullQ = BullQ('my-first-queue', {
  redis: {
    host: process.env.REDISHOST || 'localhost',
    password: process.env.REDISPASSWORD || ''
  }
});

app.get('/search/:term', async (req, res) => {
  const job = await bullQ.add({
    searchTerm: req.params.term
  });

  // Listen to the global completion of the queue in order to return result.
  bullQ.on('global:completed', (jobId, result) => {
    // Check if id is a number without any additions
    if (/^\d+$/.test(jobId) && !res.headersSent) {
      console.log(`Producer get: Job ${jobId} completed! Result: ${result}`);
      res.json(`Job is completed with result: ${result}`);
    }
  });
});

consumer.js

Потребитель имеет 2 роли. 1. Чтобы потреблять рабочие места, как это должно быть по книге 2. Для создания новых рабочих мест на основе результата последнего задания.

const BullQ = require('bull');

let bullQ = BullQ('my-first-queue', {
  redis: {
    host: process.env.REDISHOST || 'localhost',
    password: process.env.REDISPASSWORD || ''
  }
});

bullQ.process((job, done) => {
  // Simulate asynchronous server request.
  setTimeout(async () => {
    // Done the first job and return an answer to the producer after the timeout.
    done(null, `Search result for ${job.data.searchTerm}`);
    // next job run
    if (counter < 10) {
      // For the first run the id is just a number if not changed via the jobId in JobOpts,
      // the next time the job id will be set to {{id}}_next_{{counter}} we need only the first number in order not to end with a long and not clear concatenated string.
      let jobID = (/^\d+$/.test(job.id)) ? job.id : job.id.replace(/[^\d].*/,'');
      await createNextJob(jobID, ++counter);
    }
  }, 100);
});

// Create next job and add it to the queue.
// Listen to the completed jobs (locally)
const createNextJob = async (id, counter) => {
  const nextJob = bullQ.add({
    searchTerm: "Next job"
  }, {
    jobId: `${id}_next_${counter}`
  });

  await bullQ.on('completed', (job, result) => {
    job.finished();
    console.log(`Consumer(next): Job ${job.id} completed! Result: ${result}`);
  });
};
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...