Я экспериментировал с очередями служебной шины Azure в NodeJS.Я создал sender.js и listener.js на основе их кода в документации.Построение очереди работает нормально.Удаление из очереди и удаление сообщений из очереди работает нормально до тех пор, пока длина сообщения не достигнет 10. В этот момент запросы на удаление возвращают нулевые сообщения примерно 4 из 5 раз.Если я продолжу зацикливать запросы на удаление, в конечном итоге он удалит и удалит эти последние 10 сообщений.Но это кажется крайне неэффективным.Кто-нибудь еще сталкивался с этой проблемой?
listener.js
var azure = require('azure');
var async = require("async");
var connectionString = process.env.CONNECTION_STRING || "Endpoint=sb://endpoint"; // dev
console.log(process.env.CONNECTION_STRING);
var serviceBusService = azure.createServiceBusService(connectionString);
// var serviceBusService = azure.createServiceBusService();
exports.createQueue = function (req,res) {
var body = req.body;
serviceBusService.createQueueIfNotExists(body.queueName, function(error){
console.log(error);
if(!error){
// Queue exists
return res.send(200);
} else {
return res.send(500, error);
}
});
};
exports.sendMessageToQueue = function (req, res) {
var body = req.body;
var message = {
body: 'Test message',
customProperties: {
testproperty: 'TestValue'
}};
serviceBusService.sendQueueMessage(body.queueName, message, function(error){
if(!error){
// message sent
return res.send(200);
} else {
return res.send(500, error);
}
});
}
exports.receiveMessageFromQueue = function (req, res) {
var body = req.body;
serviceBusService.receiveQueueMessage(body.queueName, function(error, receivedMessage){
if(!error){
console.log(receivedMessage);
// Message received and deleted
return res.send(200,receivedMessage);
} else {
return res.send(500, error);
}
});
}
function _receiveMessageFromQueue(queueName,delayTimeIfQueueIsEmpty,callback) {
serviceBusService.receiveQueueMessage(queueName, function(error, receivedMessage){
console.log(error, receivedMessage);
// console.log(error);
if (error == 'No messages to receive') {
// call the rest of the code and have it execute after 30 seconds
setTimeout(function() {
callback(receivedMessage);
}, delayTimeIfQueueIsEmpty);
} else {
// callback immediately
callback(receivedMessage);
}
});
}
function _sendQueueMessage(queueName,message,callback) {
serviceBusService.sendQueueMessage(queueName, message, function(error){
console.log(error);
callback();
});
}
function listenMessageQueue(concurrency,delayTimeIfQueueIsEmpty,queueName) {
var taskHandler = function(task, done) {
_receiveMessageFromQueue(task.queueName, delayTimeIfQueueIsEmpty, function(message) {
if (message) {
console.log('hello ' + message.body);
}
myQueue.push({ id: task.id + 1, queueName: queueName, url: "http://localhost/get-person/" + task.id + 1});
done();
});
};
var queueSize = concurrency;
var myQueue = async.queue(taskHandler, queueSize);
myQueue.drain = function() {
console.log("All the work has been done.");
}
for(var i = 0; i < concurrency; i++) {
myQueue.push({ id: i, queueName: queueName, url: "http://localhost/get-person/"+i });
}
}
delayTimeIfQueueIsEmpty = 30000; // 30s
concurrency = 2;
queueName = "jobs";
// listen and dequeue message from azure message bus
listenMessageQueue(concurrency,delayTimeIfQueueIsEmpty,queueName);
sender.js
var azure = require('azure');
var async = require("async");
var connectionString = process.env.CONNECTION_STRING || "Endpoint=sb://endpoint";
console.log(process.env.CONNECTION_STRING);
var serviceBusService = azure.createServiceBusService(connectionString);
exports.createQueue = function (req,res) {
var body = req.body;
serviceBusService.createQueueIfNotExists(body.queueName, function(error){
console.log(error);
if(!error){
// Queue exists
return res.send(200);
} else {
return res.send(500, error);
}
});
};
exports.sendMessageToQueue = function (req, res) {
var body = req.body;
var message = {
body: 'Test message',
customProperties: {
testproperty: 'TestValue'
}};
serviceBusService.sendQueueMessage(body.queueName, message, function(error){
if(!error){
// message sent
return res.send(200);
} else {
return res.send(500, error);
}
});
}
exports.receiveMessageFromQueue = function (req, res) {
var body = req.body;
serviceBusService.receiveQueueMessage(body.queueName, function(error, receivedMessage){
if(!error){
console.log(receivedMessage);
// Message received and deleted
return res.send(200,receivedMessage);
} else {
return res.send(500, error);
}
});
}
function _receiveMessageFromQueue(queueName,delayTimeIfQueueIsEmpty,callback) {
serviceBusService.receiveQueueMessage(queueName, function(error, receivedMessage){
console.log(error, receivedMessage);
// console.log(error);
if (error == 'No messages to receive') {
// call the rest of the code and have it execute after 30 seconds
setTimeout(function() {
callback(receivedMessage);
}, delayTimeIfQueueIsEmpty);
} else {
// callback immediately
callback(receivedMessage);
}
});
}
function _sendQueueMessage(queueName,message,callback) {
serviceBusService.sendQueueMessage(queueName, message, function(error){
console.log(error);
callback();
});
}
function listenMessageQueue(concurrency,delayTimeIfQueueIsEmpty,queueName) {
var taskHandler = function(task, done) {
_receiveMessageFromQueue(task.queueName, delayTimeIfQueueIsEmpty, function(message) {
if (message) {
console.log('hello ' + message.body);
}
myQueue.push({ id: task.id + 1, queueName: queueName, url: "http://localhost/get-person/" + task.id + 1});
done();
});
};
var queueSize = concurrency;
var myQueue = async.queue(taskHandler, queueSize);
myQueue.drain = function() {
console.log("All the work has been done.");
}
for(var i = 0; i < concurrency; i++) {
myQueue.push({ id: i, queueName: queueName, url: "http://localhost/get-person/"+i });
}
}
function pushMessageQueue(concurrency,queueName) {
var taskHandler = function(task, done) {
var message = {
body: String(task.id),
customProperties: {
url: task.url
}};
_sendQueueMessage(task.queueName, message, function() {
console.log('hello ' + task.id);
myQueue.push({ id: task.id + 1, queueName: queueName, url: "http://localhost/get-person/" + task.id + 1});
done();
});
};
var queueSize = concurrency;
var myQueue = async.queue(taskHandler, queueSize);
myQueue.drain = function() {
console.log("All the work has been done.");
}
for(var i = 0; i < concurrency; i++) {
myQueue.push({ id: i, queueName: queueName, url: "http://localhost/get-person/"+i });
}
}
concurrency = 2;
queueName = "jobs";
pushMessageQueue(concurrency,queueName); // push message to queue for testing: 100 messages per call