Solace нарушает спецификацию JMS, не подтверждая предыдущие сообщения? - PullRequest
0 голосов
/ 24 мая 2018

Спецификация JMS 1.1, раздел 4.4.11, гласит: «Подтверждение использованного сообщения автоматически подтверждает получение всех сообщений, которые были доставлены его сеансом».

Однако это не то поведение, котороеЯ наблюдаю с Утешением.Я написал следующую программу из 100 строк, которая отправляет двадцать сообщений, затем читает сообщения и переключается между их подтверждением и удалением.В результате все четные сообщения остаются в очереди.

Так что, Solace нарушает спецификацию JMS или я что-то упускаю?

package com.example;

import java.util.function.Predicate;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.TextMessage;

import com.solacesystems.jms.SolConnectionFactory;
import com.solacesystems.jms.SolJmsUtility;
import com.solacesystems.jms.SupportedProperty;

public class SolaceAckTest {

    private static final String host = "localhost";
    private static final String username = "MyUser";
    private static final String password = "MyPassword";

    private static final String COUNTER_PROPERTY_NAME = "MyCounter";

    private static final String QUEUE_NAME = "MATCHED_1";

    private static final int NUM_MESSAGES_TO_SEND = 20;

    private static final long SENDING_INTERVAL_IN_MILLISECONDS = 100;

    /**
     * Determines on which messages we should call
     * {@link Message#acknowledge()}.
     */
    private static final Predicate<Message> SELECTOR_OF_MESSAGES_TO_ACKNOWLEDGE = new Predicate<Message>() {
        @Override
        public boolean test(Message m) {
            try {
                return (m.getIntProperty(COUNTER_PROPERTY_NAME) % 2) == 1;
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
        }
    };

    public static void main(String[] args) throws Exception {

        SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setRespectTTL(true);

        QueueConnection queueConnection = connectionFactory.createQueueConnection();
        QueueSession queueSession = queueConnection.createQueueSession(false, SupportedProperty.SOL_CLIENT_ACKNOWLEDGE);
        Destination requestDest = queueSession.createQueue(QUEUE_NAME);
        queueSession.createConsumer(requestDest).setMessageListener(new MessageListenerThatAcknowledgesSomeMessages());
        MessageProducer messageProducer = queueSession.createProducer(requestDest);

        queueConnection.start();

        for (int counter = 1; counter <= NUM_MESSAGES_TO_SEND; counter++) {
            TextMessage msg = queueSession.createTextMessage();
            msg.setText("Message #" + counter);
            msg.setIntProperty(COUNTER_PROPERTY_NAME, counter);
            messageProducer.send(msg);
            Thread.sleep(SENDING_INTERVAL_IN_MILLISECONDS);
        }

        // Prevent the program from terminating.
        Thread.sleep(1000);
    }

    /**
     * A listener that calls {@link Message#acknowledge()} only on messages that
     * meet the criteria specified by
     * {@link SolaceAckTest#SELECTOR_OF_MESSAGES_TO_ACKNOWLEDGE}.
     */
    private static class MessageListenerThatAcknowledgesSomeMessages implements MessageListener {
        public MessageListenerThatAcknowledgesSomeMessages() {
        }

        @Override
        public void onMessage(Message msg) {
            try {
                final String text = ((TextMessage) msg).getText();
                if (SELECTOR_OF_MESSAGES_TO_ACKNOWLEDGE.test(msg)) {
                    msg.acknowledge();
                    System.out.println("Acknowledging message: " + text);
                } else {
                    System.out.println("Not acknowledging message: " + text);
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

1 Ответ

0 голосов
/ 24 мая 2018

Я верю, что Тим прав.В вашем примере похоже, что вы используете расширение Solace, а не стандартный режим подтверждения клиента JMS.Пожалуйста, попробуйте указать стандартный режим подтверждения клиента JMS при создании сеанса.Например:

 QueueSession queueSession = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

Исходное расширение SOL_CLIENT_ACKNOWLEDGE, которое вы изначально указали, позволяет вам подтверждать определенное сообщение без неявного подтверждения всех других сообщений, полученных сеансом.Это полезно, если у вас есть несколько рабочих потоков, обрабатывающих сообщения из сеанса.Каждый поток может подтвердить свое сообщение, когда он завершил обработку, неявно подтверждая сообщения, обрабатываемые другими потоками.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...