Не удалось подключить рабочий процесс к очереди Bull - PullRequest
0 голосов
/ 29 февраля 2020

Я активно использую библиотеку Bull для нашего проекта. Несколько дней назад мы обнаружили проблему при обработке задания в Bull Queue. Мы можем добавить работу в Bull Queue, но она не может обработать работу. Когда я зарегистрировался в Taskforce, мы заметили, что задание задерживается / не выполняется с пустым ответом об ошибке. Я думаю, что это происходит потому, что рабочий процесс не связан с очередью. Тот же код работает нормально, если я попробую перезапустить процесс через несколько раз. Я не совсем уверен, если это проблема Redis или Bull или то, как я их использую. Проблема в том, что я могу добавить работу, но она не обрабатывается. (Неожиданно только в некоторых случаях)

Пример кода:

const BullQueue = require('bull');
class Queue {
    constructor(name, connectionName = DBConstant.localRedisConnectionName) {
        const options = {};
        options.redis = DBConnectionUseCase.getRedisConnectionOptions(connectionName);
        this._queue = new BullQueue(name, options);
        this._connectionName = connectionName;
        this.queueName = name;
   }
    async initProcessor() {
        try {
            //TODO: We have noticed that in some cases process does not get attached to Queue.
            //We tried adding await here and check result but did not get anything. This needs to be debug.
            this._queue.process((job, done) => {
                this.process(job, done);
            });
        }catch(error) {
            console.log(`Worker :: ${this.queueName} :: Exception in processor initialisation :: connectionName ::  ${this._connectionName} :: Error :: ${error.message} :: ${JSON.stringify(error)}`);
        }
        console.log(`Worker :: ${this.queueName} :: processor initialised :: connectionName :: ${this._connectionName}`);
    }

    getQueueName() {
        return this.queueName;
    }

    addJob(data, options) {
        console.log(`Worker :: ${this.queueName} :: Job added in Queue :: ${this._connectionName}`);
        return this._queue.add(data, options);
    }
}

class FetchCustomerData extends Queue{
  constructor() {
    super(QUEUE_NAME);
  }

  /**
   * 
   */
  static getInstance() {
    if(!queueInstance) {
      queueInstance = new FetchCustomerData();
    }
    return queueInstance;
  }

  /**
   * 
   */
  initDefaultJob() {
    const data = {};
    const options = {
        // This cron will run in everyday at 12:30AM ISE
        repeat: {
          cron: '0 20 * * *'
        }
    };

    this.addJob(data, Object.assign({},options, constant.BULL_JOB_OPTIONS));
    console.log(`Worker :: ${QUEUE_NAME} :: initial job added :: options :: ${JSON.stringify(Object.assign({},options, constant.BULL_JOB_OPTIONS))}`);
  }

  /**
   * 
  */
  async process(job, done) {
    try {
        ...
        return done(nu;;);
    }catch(error) {
      console.error(`FetchActivationalUCC.process :: ${uuid} :: Exception :: ${JSON.stringify(error)}`);
      return done(error);
    }
  }
}

Как я его использую:

FetchCustomerData.initProcessor();
const res = await FetchCustomerData.addJob({"key": 123});

Список пакетов:

"bull": "3.7.0"
"ioredis": "4.9.0"
"node": v10.15.0
...