RabbitMQ: setReturnListner handleBasicReturn не вызывается для недоставленных сообщений - PullRequest
0 голосов
/ 06 января 2011

Для одного из требований нам необходимо отслеживать глубину очереди и успешно обработанные сообщения.Идея состоит в том, чтобы публиковать сообщения и получать список успешных и неудачных сообщений.Чтобы смоделировать требование, я сделал следующее:

  1. Публикация сообщений с флагом Обязательный и Немедленный отправлено channel.basicPublish 'exchange', 'rKey', true, false, props, “Hello World” .bytes
  2. Потребитель потребляет даже помеченные (я поместил числа от 1..10 в качестве отмеченных значений в заголовке каждого сообщения) и не ACKS нечетные сообщения.
  3. Я реализовал setReturnListnere в издателедля захвата недоставленных сообщений.

Хотя я могу получить количество непакетированных сообщений через Rabbmitmqctl list_queues messages_unacknowledged, мой метод handleBasicReturn почему-то не вызывается.Я что-то упустил.

Фрагменты кода:

Производитель:

channel.setReturnListener(new ReturnListener() {
    public void handleBasicReturn(int replyCode, String replyText, String exchange,
                                  String routingKey, AMQP.BasicProperties properties,
                                  byte[] body) 
            throws IOException {
        println "Debugging messages!!!!"
        println "The details of returned messages are ${replyText} from  ${exchange} with routingKey as ${routingKey} with properties"
    }
});

println " queuename is ${dec.queue} and consumerCount is ${dec.consumerCount} messageCount is ${dec.messageCount}"
(1..10).each {
    println "Sending file ${i}....."
    def headers = new HashMap<String,Object>()
    headers.put "operatiion","scp"
    headers.put "dest","joker.dk.mach.com"
    headers.put "id", i
    println headers

    BasicProperties props = new BasicProperties(null, null, headers, null, null, null, null, null,null, null, null, null,null, null)
                    channel.basicPublish 'exchange' ,'rKey',true,false, props,"Hello Worls".bytes                                
    i++                         
}
channel.close()

Потребитель:

while (true) {
    def delivery = consumer.nextDelivery()
    def headers = delivery?.properties?.headers
    def id = headers.get("id")
    println "Received message:"
    println " ${id.toString()}"

    if( id % 2 == 0){
        channel.basicAck delivery.envelope.deliveryTag, false                
    }    
}

1 Ответ

0 голосов
/ 26 февраля 2011

Читать это объяснение о том, как флаги немедленного и обязательного действия влияют на доставку сообщений в RabbitMQ.

В вашем случае, поскольку у вас есть получатель, получающий сообщения, сообщения не будут возвращены дажеесли потребитель никогда их не получит.

D.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...