Подтвержденные сообщения доставляются при повторном подключении аварийного основного клиента. - PullRequest
0 голосов
/ 19 ноября 2018

Я установил автономный экземпляр HornetQ, который работает локально. Для тестирования я создал потребителя, использующего API HornetQ core , который будет получать сообщение каждые 500 миллисекунд.

Я сталкиваюсь со странным поведением на стороне потребителя, когда мой клиент подключается и читает все сообщения из очереди, и если я принудительно выключил это (без надлежащего закрытия сеанса / соединения), то при следующем запуске этого потребителя он снова будет читать старые сообщения из очереди. Вот мой потребительский пример:

// Код потребителя HornetQ

   public void readMessage() {
    ClientSession session = null;
    try {
        if (sf != null) {
            session = sf.createSession(true, true);

            ClientConsumer messageConsumer = session.createConsumer(JMS_QUEUE_NAME);
            session.start();

            while (true) {
                ClientMessage messageReceived = messageConsumer.receive(1000);
                if (messageReceived != null && messageReceived.getStringProperty(MESSAGE_PROPERTY_NAME) != null) {
                    System.out.println("Received JMS TextMessage:" + messageReceived.getStringProperty(MESSAGE_PROPERTY_NAME));
                    messageReceived.acknowledge();
                }

                Thread.sleep(500);
            }
        }
    } catch (Exception e) {
        LOGGER.error("Error while adding message by producer.", e);
    } finally {
        try {
            session.close();
        } catch (HornetQException e) {
            LOGGER.error("Error while closing producer session,", e);
        }
    }
}

Может кто-нибудь сказать мне, почему он работает так, и какую конфигурацию я должен использовать на стороне клиент / сервер, чтобы, если сообщение, прочитанное потребителем, удаляло это из очереди?

1 Ответ

0 голосов
/ 20 ноября 2018

Вы не фиксируете сеанс после завершения подтверждений и не создаете сеанс с включенной автоматической фиксацией для подтверждений.Следовательно, вам следует выполнить одно из следующих действий:

  • Либо явно вызвать session.commit() после одного или нескольких вызовов acknowledge()
  • , либо включить неявную автоматическую фиксацию для подтверждений, создавсеанс с использованием sf.createSession(true,true) или sf.createSession(false,true) (логическое значение, которое управляет автоматической фиксацией для подтверждений, является вторым).

Имейте в виду, что при включении автоматической фиксации для подтверждений возникаетвнутренний буфер, который должен достигнуть определенного размера, прежде чем подтверждения будут сброшены посреднику.Подобные пакетные подтверждения могут радикально улучшить производительность в некоторых случаях большого объема использования.По умолчанию вам необходимо подтвердить сообщения объемом 1 048 576 байт, чтобы очистить буфер и отправить подтверждения брокеру.Вы можете изменить размер этого буфера, вызвав setAckBatchSize в вашем экземпляре ServerLocator или используя другой метод createSession (например, sf.createSession(true, true, myAckBatchSize)).

Если буфер подтверждения не очищен и ваш клиент падает, соответствующие сообщения все еще будут в очереди, когда клиент вернется.Если буфер не достиг своего порога, он все равно будет очищен, когда потребитель будет корректно закрыт.

...