В моем приложении у меня есть маршрут express для прослушивания событий веб-хука (выводится в JSON). Каждое событие содержит дату в будущем. Например:
"event": {
"name": "event 1",
"next_step_date": "1427997766000"
}
}
Мне нужно запланировать работу на эту будущую дату. Поэтому я установил пользовательский процесс синхронизации Heroku, но я не знаю, как заставить его запускаться для каждого нового события webhooks
Вот мой маршрут для прослушивания событий webhook в моем индексе. js file
// Route that receives a POST request from webhook
app.post('/test/webhook/deal/closed-won/default-pipeline', async (req, res) => {
console.log('Received webhook');
const objectId = req.body.objectId;
res.send(`You sent an object with id: ${objectId} to Express`)
вот мои часы. js файл:
const CronJob = require('cron').CronJob
const amqp = require('amqp-connection-manager')
const AMQP_URL = process.env.CLOUDAMQP_URL || 'amqp://localhost';
if (!AMQP_URL) process.exit(1)
const WORKER_QUEUE = 'worker-queue' // To consume from worker process
const CLOCK_QUEUE = 'clock-queue' // To consume from clock process
const JOBS = [{ // You could store these jobs in a database
name: "Cron process 1",
message: { "taskName": "createRenewalDeal", "queue": "worker-queue" }, // message in json format
crontTime: "0 17 03 01 * *", // Every 1st of every month at 3:17 AM
repeat: 1
}]
// Create a new connection manager from AMQP
var connection = amqp.connect([AMQP_URL])
console.log('[AMQP] - Connecting...')
connection.on('connect', function() {
process.once('SIGINT', function() { // Close conn on exit
connection.close()
})
console.log('[AMQP] - Connected!')
return startCronProcess(JOBS)
})
connection.on('disconnect', function(params) {
return console.error('[AMQP] - Disconnected.', params.err.stack)
})
const startCronProcess = (jobs) => {
if (jobs && jobs.length) {
jobs.forEach(job => {
let j = new CronJob({
cronTime: job.cronTime ? job.cronTime : new Date(job.dateTime),
onTick: () => {
sendMessage(job.message)
if (!job.repeat) j.stop()
},
onComplete: () => {
console.log('Job completed! Removing now...')
},
timeZone: 'America/Argentina/Buenos_Aires',
start: true // Start now
})
})
}
}
const sendMessage = (data) => {
let message
try {
message = JSON.parse(data)
} catch(e) {
console.error(e)
}
if (!message) { return }
let queue = message.queue || WORKER_QUEUE
let senderChannelWrapper = connection.createChannel({
json: true,
setup: function(channel) {
return channel.assertQueue(queue, {durable: true});
}
})
senderChannelWrapper.sendToQueue(queue, message, { contentType: 'application/json', persistent: true })
.then(function() {
console.log('[AMQP] - Message sent to queue =>', queue)
senderChannelWrapper.close()
})
.catch(err => {
console.error('[AMQP] - Message to queue => '+queue+ '<= was rejected! ', err.stack)
senderChannelWrapper.close()
})
}
и мой рабочий. js
const amqp = require('amqp-connection-manager')
const AMQP_URL = process.env.CLOUDAMQP_URL || 'amqp://localhost';
if (!AMQP_URL) process.exit(1)
const WORKER_QUEUE = 'worker-queue'
// Create a new connection manager from AMQP
var connection = amqp.connect([AMQP_URL])
console.log('[AMQP] - Connecting....')
connection.on('connect', function() {
process.once('SIGINT', function() { // Close conn on exit
connection.close()
})
return console.log('[AMQP] - Connected!')
})
connection.on('disconnect', function(params) {
return console.error('[AMQP] - Disconnected.', params.err.stack)
})
// ---------- To receive the execution task messages
let channelWrapper = connection.createChannel({
json: true,
setup: function(channel) {
return Promise.all([
channel.assertQueue(WORKER_QUEUE, { autoDelete: false, durable: true }),
channel.prefetch(1),
channel.consume(WORKER_QUEUE, onMessage)
])
}
})
channelWrapper.waitForConnect()
.then(function() {
console.log('[AMQP] - Listening for messages on queue => '+WORKER_QUEUE)
})
.catch(function(err) {
console.error('[AMQP] - Error! ', err)
})
// Process message from AMQP
function onMessage(data) {
let message
try {
message = JSON.parse(data.content.toString())
} catch(e) {
console.error('[AMQP] - Error parsing message... ', data)
}
console.log('[AMQP] - Message incoming... ', message)
channelWrapper.ack(data)
if (!message) {
return
}
switch (message.taskName) {
case 'createRenewalDeal':
// do something....
break
default:
console.error('No task was found with name => '+message.taskName)
}
}
Спасибо, что помогли мне понять, как связать все это вместе
Best,