ActiveMQ JMSXGroupId свойство не имеет никакого эффекта - PullRequest
0 голосов
/ 22 мая 2019

Я хочу сгруппировать сообщения, используя свойство JMSXGroupId. Но потребители получают все сообщения очереди без группировки

У меня есть локальный экземпляр ActiveMQ (версия 5.15.9, Windows 10, Java 8).
Я написал DoSend Java-класс для отправки текстовых сообщений и DoRead Java-класс для получения текстовых сообщений

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class DoSend {
    public static void main(String[] args) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

        Connection connection = null;
        try {
            connection = factory.createConnection("admin", "admin");
            connection.start();

            Session session = null;
            try {
                session = connection.createSession(true, Session.SESSION_TRANSACTED);

                MessageProducer producer = null;
                try {
                    Destination destination = session.createQueue("TEST");
                    producer = session.createProducer(destination);

                    int index = 0;
                    while (true) {
                        index += 1;

                        TextMessage message = session.createTextMessage(String.valueOf(index));

                        message.setStringProperty("JMSXGroupID", String.valueOf(index % 3));
                        producer.send(destination, message);
                        session.commit();

                        Thread.sleep(1000);
                    }
                } finally {
                    if (producer != null) {
                        producer.close();
                    }
                }

            } catch (Throwable th) {
                th.printStackTrace();

                if (session != null) {
                    session.rollback();
                }

                throw new RuntimeException(th);
            } finally {
                if (session != null) {
                    session.close();
                }
            }
        } finally {
            if (connection != null) {
                connection.close();
            }
        }
    }
}

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class DoRead {
    public static void main(String[] args) throws JMSException {
        final ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

        System.out.println("START " + Thread.currentThread());
        try {
            Connection connection = null;
            try {
                connection = factory.createConnection("admin", "admin");
                connection.start();

                Session session = null;
                try {
                    session = connection.createSession(true, Session.SESSION_TRANSACTED);

                    Destination destination = session.createQueue("TEST");
                    MessageConsumer consumer = null;
                    try {
                        consumer = session.createConsumer(destination);

                        TextMessage message;
                        while ((message = (TextMessage) consumer.receive(10000)) != null) {
                            System.out.println(Thread.currentThread() + ": message.index = " + message.getText());
                            System.out.println(Thread.currentThread() + ": message.JMSXGroupID = " + message.getStringProperty("JMSXGroupID"));
                            session.commit();
                        }
                    } finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                } finally {
                    if (session != null) {
                        session.close();
                    }
                }
            } finally {
                if (connection != null) {
                    connection.close();
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            System.out.println("FINISH " + Thread.currentThread());
        }
    }
}

Я ожидаю, что любой потребитель получит сообщения с тем же JMSXGroupId, но теперь первый потребитель получает все сообщения, а остальные ничего не получают.

...