как запланировать работу cron для каждого события webhook, используя node js и heroku - PullRequest
0 голосов
/ 14 января 2020

В моем приложении у меня есть маршрут express для прослушивания событий веб-хука (выводится в JSON). Каждое событие содержит дату в будущем. Например:

    "event": {
        "name": "event 1",
        "next_step_date": "1427997766000"
         }
}

Мне нужно запланировать работу на эту будущую дату. Поэтому я установил пользовательский процесс синхронизации Heroku, но я не знаю, как заставить его запускаться для каждого нового события webhooks

Вот мой маршрут для прослушивания событий webhook в моем индексе. js file

// Route that receives a POST request from webhook
app.post('/test/webhook/deal/closed-won/default-pipeline', async (req, res) => {

  console.log('Received webhook');
  const objectId = req.body.objectId;
  res.send(`You sent an object with id: ${objectId} to Express`)

вот мои часы. js файл:

const CronJob = require('cron').CronJob
const amqp = require('amqp-connection-manager')

const AMQP_URL = process.env.CLOUDAMQP_URL || 'amqp://localhost';
if (!AMQP_URL) process.exit(1)

const WORKER_QUEUE = 'worker-queue'  // To consume from worker process
const CLOCK_QUEUE = 'clock-queue'  // To consume from clock process
const JOBS = [{  // You could store these jobs in a database
  name: "Cron process 1",
  message: { "taskName": "createRenewalDeal", "queue": "worker-queue" },  // message in json format
  crontTime: "0 17 03 01 * *",  // Every 1st of every month at 3:17 AM
  repeat: 1
}]

// Create a new connection manager from AMQP
var connection = amqp.connect([AMQP_URL])
console.log('[AMQP] - Connecting...') 

connection.on('connect', function() {
  process.once('SIGINT', function() {  // Close conn on exit
    connection.close() 
  })
  console.log('[AMQP] - Connected!')
  return startCronProcess(JOBS)
})

connection.on('disconnect', function(params) {
  return console.error('[AMQP] - Disconnected.', params.err.stack) 
})

const startCronProcess = (jobs) => {
  if (jobs && jobs.length) {
    jobs.forEach(job => {
      let j = new CronJob({
        cronTime: job.cronTime ? job.cronTime : new Date(job.dateTime),
        onTick: () => {
          sendMessage(job.message)
          if (!job.repeat) j.stop()
        },
        onComplete: () => {
          console.log('Job completed! Removing now...')
        },  
        timeZone: 'America/Argentina/Buenos_Aires',
        start: true  // Start now
      })
    })
  }
}

const sendMessage = (data) => {
  let message
  try {
    message = JSON.parse(data)
  } catch(e) {
    console.error(e)
  }
  if (!message) { return }

  let queue = message.queue || WORKER_QUEUE
  let senderChannelWrapper = connection.createChannel({
    json: true,
    setup: function(channel) {
      return channel.assertQueue(queue, {durable: true});
    }
  })

  senderChannelWrapper.sendToQueue(queue, message, { contentType: 'application/json', persistent: true })
    .then(function() {
      console.log('[AMQP] - Message sent to queue =>', queue)
      senderChannelWrapper.close()
    })
    .catch(err => {
      console.error('[AMQP] - Message to queue => '+queue+ '<= was rejected! ', err.stack)
      senderChannelWrapper.close()
    })
}

и мой рабочий. js


const amqp = require('amqp-connection-manager')

const AMQP_URL = process.env.CLOUDAMQP_URL || 'amqp://localhost';
if (!AMQP_URL) process.exit(1)

const WORKER_QUEUE = 'worker-queue' 

// Create a new connection manager from AMQP
var connection = amqp.connect([AMQP_URL])
console.log('[AMQP] - Connecting....') 

connection.on('connect', function() {
  process.once('SIGINT', function() { // Close conn on exit
    connection.close()
  })
  return console.log('[AMQP] - Connected!')
})

connection.on('disconnect', function(params) {
  return console.error('[AMQP] - Disconnected.', params.err.stack) 
})

// ---------- To receive the execution task messages
let channelWrapper = connection.createChannel({
  json: true,
  setup: function(channel) {
    return Promise.all([
      channel.assertQueue(WORKER_QUEUE, { autoDelete: false, durable: true }),
      channel.prefetch(1),
      channel.consume(WORKER_QUEUE, onMessage)
    ])
  }
})

channelWrapper.waitForConnect()
  .then(function() {
    console.log('[AMQP] - Listening for messages on queue => '+WORKER_QUEUE)
  })
  .catch(function(err) {
    console.error('[AMQP] - Error! ', err)
  })

// Process message from AMQP
function onMessage(data) {
  let message
  try {
    message = JSON.parse(data.content.toString())
  } catch(e) {
    console.error('[AMQP] - Error parsing message... ', data)
  }

  console.log('[AMQP] - Message incoming... ', message)
  channelWrapper.ack(data)
  if (!message) {
    return
  }

  switch (message.taskName) {
    case 'createRenewalDeal': 
      // do something....
      break

    default:
      console.error('No task was found with name => '+message.taskName)
  }
}

Спасибо, что помогли мне понять, как связать все это вместе

Best,

...