Запрос на удаление из очереди в служебной шине Azure, когда длина очереди <10 часто возвращает ноль - PullRequest
0 голосов
/ 18 мая 2018

Я экспериментировал с очередями служебной шины Azure в NodeJS.Я создал sender.js и listener.js на основе их кода в документации.Построение очереди работает нормально.Удаление из очереди и удаление сообщений из очереди работает нормально до тех пор, пока длина сообщения не достигнет 10. В этот момент запросы на удаление возвращают нулевые сообщения примерно 4 из 5 раз.Если я продолжу зацикливать запросы на удаление, в конечном итоге он удалит и удалит эти последние 10 сообщений.Но это кажется крайне неэффективным.Кто-нибудь еще сталкивался с этой проблемой?

listener.js

var azure = require('azure');
var async = require("async");

var connectionString = process.env.CONNECTION_STRING || "Endpoint=sb://endpoint"; // dev

console.log(process.env.CONNECTION_STRING);

var serviceBusService = azure.createServiceBusService(connectionString);
// var serviceBusService = azure.createServiceBusService();

exports.createQueue = function (req,res) {

    var body = req.body;

    serviceBusService.createQueueIfNotExists(body.queueName, function(error){
        console.log(error);
        if(!error){
            // Queue exists
            return res.send(200);
        } else {
            return res.send(500, error);   
        }
    });
};

exports.sendMessageToQueue = function (req, res) {
    var body = req.body;

    var message = {
        body: 'Test message',
        customProperties: {
            testproperty: 'TestValue'
        }};

    serviceBusService.sendQueueMessage(body.queueName, message, function(error){
        if(!error){
            // message sent
            return res.send(200);
        } else {
            return res.send(500, error);   
        }
    });
}

exports.receiveMessageFromQueue = function (req, res) {
    var body = req.body;

    serviceBusService.receiveQueueMessage(body.queueName, function(error, receivedMessage){
        if(!error){
            console.log(receivedMessage);

            // Message received and deleted
            return res.send(200,receivedMessage);
        }  else {
            return res.send(500, error);   
        }
    });
}

function _receiveMessageFromQueue(queueName,delayTimeIfQueueIsEmpty,callback) {
    serviceBusService.receiveQueueMessage(queueName, function(error, receivedMessage){
        console.log(error, receivedMessage);
        // console.log(error);
        if (error == 'No messages to receive') {
            // call the rest of the code and have it execute after 30 seconds
            setTimeout(function() {
                callback(receivedMessage);
            }, delayTimeIfQueueIsEmpty);
        } else {
            // callback immediately
            callback(receivedMessage);
        }
    });
}

function _sendQueueMessage(queueName,message,callback) {
    serviceBusService.sendQueueMessage(queueName, message, function(error){
        console.log(error);
        callback();
    });
}

function listenMessageQueue(concurrency,delayTimeIfQueueIsEmpty,queueName) {

    var taskHandler = function(task, done) {
        _receiveMessageFromQueue(task.queueName, delayTimeIfQueueIsEmpty, function(message) {
            if (message) {
                console.log('hello ' + message.body);
            }

            myQueue.push({ id: task.id + 1, queueName: queueName, url: "http://localhost/get-person/" + task.id + 1});

            done();
        });
      };

    var queueSize = concurrency;

    var myQueue = async.queue(taskHandler, queueSize);

    myQueue.drain = function() {
        console.log("All the work has been done.");
    }

    for(var i = 0; i < concurrency; i++) {
        myQueue.push({ id: i, queueName: queueName, url: "http://localhost/get-person/"+i });
    }

}

delayTimeIfQueueIsEmpty = 30000; // 30s
concurrency = 2;
queueName = "jobs";
// listen and dequeue message from azure message bus
listenMessageQueue(concurrency,delayTimeIfQueueIsEmpty,queueName);

sender.js

var azure = require('azure');
var async = require("async");

var connectionString = process.env.CONNECTION_STRING || "Endpoint=sb://endpoint";

console.log(process.env.CONNECTION_STRING);

var serviceBusService = azure.createServiceBusService(connectionString);

exports.createQueue = function (req,res) {

    var body = req.body;

    serviceBusService.createQueueIfNotExists(body.queueName, function(error){
        console.log(error);
        if(!error){
            // Queue exists
            return res.send(200);
        } else {
            return res.send(500, error);   
        }
    });
};

exports.sendMessageToQueue = function (req, res) {
    var body = req.body;

    var message = {
        body: 'Test message',
        customProperties: {
            testproperty: 'TestValue'
        }};

    serviceBusService.sendQueueMessage(body.queueName, message, function(error){
        if(!error){
            // message sent
            return res.send(200);
        } else {
            return res.send(500, error);   
        }
    });
}

exports.receiveMessageFromQueue = function (req, res) {
    var body = req.body;

    serviceBusService.receiveQueueMessage(body.queueName, function(error, receivedMessage){
        if(!error){
            console.log(receivedMessage);

            // Message received and deleted
            return res.send(200,receivedMessage);
        }  else {
            return res.send(500, error);   
        }
    });
}

function _receiveMessageFromQueue(queueName,delayTimeIfQueueIsEmpty,callback) {
    serviceBusService.receiveQueueMessage(queueName, function(error, receivedMessage){
        console.log(error, receivedMessage);
        // console.log(error);
        if (error == 'No messages to receive') {
            // call the rest of the code and have it execute after 30 seconds
            setTimeout(function() {
                callback(receivedMessage);
            }, delayTimeIfQueueIsEmpty);
        } else {
            // callback immediately
            callback(receivedMessage);
        }
    });
}

function _sendQueueMessage(queueName,message,callback) {
    serviceBusService.sendQueueMessage(queueName, message, function(error){
        console.log(error);
        callback();
    });
}

function listenMessageQueue(concurrency,delayTimeIfQueueIsEmpty,queueName) {

    var taskHandler = function(task, done) {
        _receiveMessageFromQueue(task.queueName, delayTimeIfQueueIsEmpty, function(message) {
            if (message) {
                console.log('hello ' + message.body);
            }

            myQueue.push({ id: task.id + 1, queueName: queueName, url: "http://localhost/get-person/" + task.id + 1});

            done();
        });
      };

    var queueSize = concurrency;

    var myQueue = async.queue(taskHandler, queueSize);

    myQueue.drain = function() {
        console.log("All the work has been done.");
    }

    for(var i = 0; i < concurrency; i++) {
        myQueue.push({ id: i, queueName: queueName, url: "http://localhost/get-person/"+i });
    }

}

function pushMessageQueue(concurrency,queueName) {

    var taskHandler = function(task, done) {

        var message = {
            body: String(task.id),
            customProperties: {
                url: task.url
            }};

        _sendQueueMessage(task.queueName, message, function() {
            console.log('hello ' + task.id);
            myQueue.push({ id: task.id + 1, queueName: queueName, url: "http://localhost/get-person/" + task.id + 1});
            done();
        });
      };

    var queueSize = concurrency;

    var myQueue = async.queue(taskHandler, queueSize);

    myQueue.drain = function() {
        console.log("All the work has been done.");
    }

    for(var i = 0; i < concurrency; i++) {
        myQueue.push({ id: i, queueName: queueName, url: "http://localhost/get-person/"+i });
    }

}

concurrency = 2;
queueName = "jobs";
pushMessageQueue(concurrency,queueName); // push message to queue for testing: 100 messages per call

1 Ответ

0 голосов
/ 08 июня 2018

наконец-то смог связаться со службой поддержки Azure и нашел ответ.ServiceBus по умолчанию включает разделение.При выполнении http-запросов (NodeJS SDK для Azure ServiceBus выполняет http-вызовы REST), когда количество сообщений низкое, это может привести к созданию разделов с различными наборами сообщений, поскольку у них не было возможности синхронизироваться.Эта проблема решается путем создания новой очереди, которая отключает разбиение на разделы, или путем увеличения поддержки активности или с помощью DotNet SDK, который позволяет выполнять запросы https.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...