Я хочу сгруппировать сообщения, используя свойство JMSXGroupId. Но потребители получают все сообщения очереди без группировки
У меня есть локальный экземпляр ActiveMQ (версия 5.15.9, Windows 10, Java 8).
Я написал DoSend Java-класс для отправки текстовых сообщений и DoRead Java-класс для получения текстовых сообщений
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class DoSend {
public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = null;
try {
connection = factory.createConnection("admin", "admin");
connection.start();
Session session = null;
try {
session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = null;
try {
Destination destination = session.createQueue("TEST");
producer = session.createProducer(destination);
int index = 0;
while (true) {
index += 1;
TextMessage message = session.createTextMessage(String.valueOf(index));
message.setStringProperty("JMSXGroupID", String.valueOf(index % 3));
producer.send(destination, message);
session.commit();
Thread.sleep(1000);
}
} finally {
if (producer != null) {
producer.close();
}
}
} catch (Throwable th) {
th.printStackTrace();
if (session != null) {
session.rollback();
}
throw new RuntimeException(th);
} finally {
if (session != null) {
session.close();
}
}
} finally {
if (connection != null) {
connection.close();
}
}
}
}
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class DoRead {
public static void main(String[] args) throws JMSException {
final ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
System.out.println("START " + Thread.currentThread());
try {
Connection connection = null;
try {
connection = factory.createConnection("admin", "admin");
connection.start();
Session session = null;
try {
session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination destination = session.createQueue("TEST");
MessageConsumer consumer = null;
try {
consumer = session.createConsumer(destination);
TextMessage message;
while ((message = (TextMessage) consumer.receive(10000)) != null) {
System.out.println(Thread.currentThread() + ": message.index = " + message.getText());
System.out.println(Thread.currentThread() + ": message.JMSXGroupID = " + message.getStringProperty("JMSXGroupID"));
session.commit();
}
} finally {
if (consumer != null) {
consumer.close();
}
}
} finally {
if (session != null) {
session.close();
}
}
} finally {
if (connection != null) {
connection.close();
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
System.out.println("FINISH " + Thread.currentThread());
}
}
}
Я ожидаю, что любой потребитель получит сообщения с тем же JMSXGroupId, но теперь первый потребитель получает все сообщения, а остальные ничего не получают.