Сообщения HornetQ все еще остаются в очереди после использования с использованием API ядра - PullRequest
7 голосов
/ 23 июня 2011

Я новичок в HornetQ, поэтому, пожалуйста, потерпите меня.Позвольте мне сначала рассказать вам о моих требованиях:

Мне нужно промежуточное программное обеспечение для организации очередей сообщений, которое может передавать сообщения размером около 1 КБ между различными процессами с низкой задержкой и постоянством (т.е. оно должно выдерживать сбои системы).У меня будет несколько процессов, пишущих в одну и ту же очередь, и несколько процессов, читающих из одной и той же очереди.

Для этого я выбрал HornetQ, так как он имеет лучший рейтинг для передачи сообщений с сохранением.

В настоящее время я использую Hornetq v2.2.2Final как автономный сервер .
Я могу успешно создать длительный / неочереди с использованием core api (ClientSession) и успешной отправки сообщений в очередь (ClientProducer) .
Аналогичным образом я могу читать сообщения оточередь с использованием ядра API (ClientConsumer) .

Проблема возникает после этого, когда клиент прочитал сообщение, сообщение все еще остается в очереди, то есть количество сообщений в очереди остается постоянным .Возможно, я ошибаюсь, но у меня сложилось впечатление, что как только сообщение израсходовано (read + ack) , оно удаляется из очереди. Но этого не происходит в моемслучае, и одни и те же сообщения читаются снова и снова.

Кроме того, я хотел бы сказать, что я пытался использовать недолговечные очереди с недолговечными сообщениями.но проблема остается .

Код для производителя, который я использую:

public class HQProducer implements Runnable {

    private ClientProducer producer;
    private boolean killme;
    private ClientSession session;
    private boolean durableMsg;

    public HQProducer(String host, int port, String address, String queueName,
            boolean deleteQ, boolean durable, boolean durableMsg, int pRate) {
        this.durableMsg = durableMsg;
        try {
            HashMap map = new HashMap();
            map.put("host", host);
            map.put("port", port);

            TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);

            ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);

            ClientSessionFactory factory = locator.createSessionFactory();

            session = factory.createSession();

            if (queueExists(queueName)) {
                if (deleteQ) {
                    System.out.println("Deleting existing queue :: " + queueName);
                    session.deleteQueue(queueName);
                    System.out.println("Creating queue :: " + queueName);
                    session.createQueue(address, queueName, true);
                }
            } else {
                System.out.println("Creating new  queue :: " + queueName);
                session.createQueue(address, queueName, durable);
            }
            producer = session.createProducer(SimpleString.toSimpleString(address), pRate);

            killme = false;
        } catch (Exception ex) {
            Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    @Override
    public void run() {
        long time = System.currentTimeMillis();
        int cnt = 0;
        long timediff;
        while (!killme) {
            try {
                ClientMessage message = session.createMessage(durableMsg);

                message.getBodyBuffer().writeString("Hello world");

                producer.send(message);
                cnt++;
                timediff = ((System.currentTimeMillis() - time) / 1000);
                if (timediff >= 1) {
                    System.out.println("Producer tps :: " + cnt);
                    cnt = 0;
                    time = System.currentTimeMillis();
                }
            } catch (HornetQException ex) {
                Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        try {
            session.close();
        } catch (HornetQException ex) {
            Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void setKillMe(boolean killme) {
        this.killme = killme;
    }

    private boolean queueExists(String qname) {
        boolean res = false;
        try {
            //ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname));
            QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname));
            if (queueQuery.isExists()) {
                res = true;
            }
        } catch (HornetQException ex) {
            res = false;
        }
        return res;
    }
}

Также код для потребителя:

public class HQConsumer implements Runnable {

    private ClientSession session;
    private ClientConsumer consumer;
    private boolean killMe;

    public HQConsumer(String host, int port, String queueName, boolean browseOnly) {
        try {
            HashMap map = new HashMap();
            map.put("host", host);
            map.put("port", port);

            TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);

            ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);

            ClientSessionFactory factory = locator.createSessionFactory();

            session = factory.createSession();

            session.start();

            consumer = session.createConsumer(queueName, "",0,-1,browseOnly);

            killMe = false;
        } catch (Exception ex) {
            Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    @Override
    public void run() {
        long time = System.currentTimeMillis();
        int cnt = 0;
        long timediff;
        while (!killMe) {
            try {
                ClientMessage msgReceived = consumer.receive();
                msgReceived.acknowledge();
                //System.out.println("message = " + msgReceived.getBodyBuffer().readString());
                cnt++;
                timediff = ((System.currentTimeMillis() - time) / 1000);
                if (timediff >= 1) {
                    System.out.println("ConSumer tps :: " + cnt);
                    cnt = 0;
                    time = System.currentTimeMillis();
                }
            } catch (HornetQException ex) {
                Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        try {
            session.close();
        } catch (HornetQException ex) {
            Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void setKillMe(boolean killMe) {
        this.killMe = killMe;
    }
}

Конфигурация сервера HornetQ::

<configuration xmlns="urn:hornetq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

   <paging-directory>${data.dir:../data}/paging</paging-directory>

   <bindings-directory>${data.dir:../data}/bindings</bindings-directory>

   <journal-directory>${data.dir:../data}/journal</journal-directory>

   <journal-min-files>10</journal-min-files>

   <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>

   <connectors>
      <connector name="netty">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
         <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
         <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
      </connector>

      <connector name="netty-throughput">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
         <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
         <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
         <param key="batch-delay" value="50"/>
      </connector>
   </connectors>

   <acceptors>
      <acceptor name="netty">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
         <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
         <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
      </acceptor>

      <acceptor name="netty-throughput">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
         <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
         <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
         <param key="batch-delay" value="50"/>
         <param key="direct-deliver" value="false"/>
      </acceptor>
   </acceptors>

   <security-settings>
      <security-setting match="#">
         <permission type="createNonDurableQueue" roles="guest"/>
         <permission type="deleteNonDurableQueue" roles="guest"/>
         <permission type="createDurableQueue" roles="guest"/>
         <permission type="deleteDurableQueue" roles="guest"/>
         <permission type="consume" roles="guest"/>
         <permission type="send" roles="guest"/>
      </security-setting>
   </security-settings>

   <address-settings>
      <!--default for catch all-->
      <address-setting match="#">
         <dead-letter-address>jms.queue.DLQ</dead-letter-address>
         <expiry-address>jms.queue.ExpiryQueue</expiry-address>
         <redelivery-delay>0</redelivery-delay>
         <max-size-bytes>10485760</max-size-bytes>       
         <message-counter-history-day-limit>10</message-counter-history-day-limit>
         <address-full-policy>BLOCK</address-full-policy>
      </address-setting>
   </address-settings>

</configuration>

Ответы [ 2 ]

14 голосов
/ 25 июня 2011

С помощью API ядра hornetq вы должны явно подтвердить сообщение. Я не вижу, где это происходит в вашем тесте.

Если вы не проверяете, это причина, по которой ваши сообщения блокируются. Мне нужно увидеть ваш полный пример, чтобы дать вам полный ответ.

Также: вы должны определить ваш createSession с помощью: createSession (true, true, 0)

В базовом API есть возможность пакетного подтверждения. Вы не используете транзакционный сеанс, поэтому вы не будете отправлять подтверждения на сервер, пока не достигнете ackBatchSize, настроенного на вашем serverLocator. При этом любое подтверждение будет отправлено на сервер, как только вы позвоните в службу подтверждения () в своем сообщении.

Опция, которую вы используете в данный момент, эквивалентна JMS DUPS_OK ​​с определенным DUPS_SIZE.

(Сообщение отредактировало мой первоначальный ответ после некоторой итерации с вами)

2 голосов
/ 01 июля 2011

Установка ackbatchsize помогла мне решить проблему .. Спасибо за помощь

...