ActiveMQ игнорирует настройки приоритета - PullRequest
0 голосов
/ 02 апреля 2020

С этим кодом:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class CompositeQueuePriority {
  public static void main(String[] args) throws Exception {
    String brokerUrl = "tcp://localhost:61616";

    BrokerService broker = new BrokerService();
    broker.addConnector(brokerUrl);
    broker.setPersistent(false);
    broker.setDestinationPolicy(policyMap());
    broker.start();

    Destination a = ActiveMQDestination.createDestination("queue", ActiveMQDestination.QUEUE_TYPE);

    Session session = createSession();

    MessageProducer lowProducer = session.createProducer(a);
    lowProducer.setPriority(1);

    MessageProducer highProducer = session.createProducer(a);
    highProducer.setPriority(9);

    MessageConsumer consumer = session.createConsumer(a);

    for (int i = 0; i < 10; i++) {
      lowProducer.send(session.createTextMessage("Low"));
      highProducer.send(session.createTextMessage("High"));

      String first = ((TextMessage) consumer.receive()).getText();
      String second = ((TextMessage) consumer.receive()).getText();

      System.out.println(first + ", " + second);
    }

    broker.stop();
  }

  private static Session createSession() throws JMSException {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
    Connection connection = connectionFactory.createConnection();
    connection.start();
    return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  }

  private static PolicyMap policyMap() {
    PolicyMap policyMap = new PolicyMap();
    policyMap.setDefaultEntry(prioPolicyEntry());
    return policyMap;
  }

  private static PolicyEntry prioPolicyEntry() {
    PolicyEntry policyEntry = new PolicyEntry();
    policyEntry.setPrioritizedMessages(true);
    return policyEntry;
  }
}

Вывод:

Low, High
Low, High
Low, High
Low, High
Low, High
Low, High
Low, High
Low, High
Low, High
Low, High

Согласно tdocumentation, приоритет поддерживается начиная с 5.4, я использую 5.15. Я что-то не так делаю?

1 Ответ

2 голосов
/ 02 апреля 2020

Я полагаю, что ваша проблема в том, что потребитель уже был создан при отправке сообщений, что означает, что сообщения будут немедленно отправлены потребителю, как только их получит брокер, и, следовательно, сообщения не будут иметь возможности быть восстановленными. по приоритету.

Сначала отправьте все сообщения, а затем создайте своего потребителя.

...