Тайм-аут на уровне потребителя ActiveMQ - PullRequest
0 голосов
/ 03 октября 2018

Я пытаюсь создать тайм-аут на уровне потребителя в Active MQ (версия 5.15.0).Предположим, что одно сообщение выбрано потребителем, но не может его подтвердить, поэтому в этом случае я хочу, чтобы время ожидания потребителя было таким, чтобы это же сообщение могло быть выбрано другим потребителем, слушающим брокера.

Код моего производителя, где я устанавливаю двапотребительский слушатель:

public class JmsMessageListenerAckExample {
  public static void main(String[] args) throws URISyntaxException, Exception {
    Connection connection = null;
    try {
      // Producer
      ConnectionFactory factory = createActiveMQConnectionFactory();
      connection = factory.createConnection();
      Session session = connection.createSession(false,
          Session.CLIENT_ACKNOWLEDGE);
      Queue queue = session.createQueue("customerQueue");
      String payload = "Important Task";
      Message msg = session.createTextMessage(payload);
      MessageProducer producer = session.createProducer(queue);

      System.out.println("Sending text '" + payload + "'");
      producer.send(msg);

      // Consumer
      MessageConsumer consumer1 = session.createConsumer(queue);
      consumer1.setMessageListener(
          new AckMessageListener(false, "consumer1"));
      Thread.sleep(1000);
      System.out.println("Creating new message listener to acknowledge");
      producer.send(msg);
      MessageConsumer consumer2 = session.createConsumer(queue);
      consumer2.setMessageListener(
          new AckMessageListener(true, "consumer2"));
      connection.start();

      Thread.sleep(3000);
      session.close();
    } finally {
      if (connection != null) {
        connection.close();
      }
    }
  }

  private static ActiveMQConnectionFactory createActiveMQConnectionFactory() {
    // Create a connection factory.
    final ActiveMQConnectionFactory connectionFactory =
        new ActiveMQConnectionFactory("tcp://localhost:61616");

    // Pass the username and password.
    connectionFactory.setUserName("user");
    connectionFactory.setPassword("user");
    return connectionFactory;
  }
}

Это мой потребительский слушатель:

public class AckMessageListener implements MessageListener {
  private boolean acknowledge;
  private String consumerName;

  public AckMessageListener(boolean acknowledge, String consumerName) {
    this.acknowledge = acknowledge;
    this.consumerName = consumerName;
  }

  public void onMessage(Message message) {
    boolean terminate = !acknowledge;
    try {

      System.out.println("ConsumerName="+consumerName+", Acknowledge="+acknowledge);
      if (acknowledge) {
        try {
          message.acknowledge();
        } catch (JMSException e1) {
          e1.printStackTrace();
        }
      }

      System.out.println(message);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      if (terminate) {
        Thread.currentThread().interrupt();
      }
    }
  }
}

Я хочу смоделировать таким образом, что consumer1 слушает сообщение, но не подтверждает его, так что время ожидания истекло.пытаюсь освободить поток, я ожидаю, что мой consumer2 подберет его и подтвердит сообщение, чтобы сообщение перешло из состояния «Сообщения в очередь» в состояние «Сообщения в очереди» в посреднике, но мой consumer2 не являетсявозможность получать любые сообщения о событиях.

Есть ли что-то, что я делаю не так.Как мне добиться тайм-аута на уровне потребителя с Active MQ?

1 Ответ

0 голосов
/ 03 октября 2018

Один из способов справиться с этим - использовать транзакции (http://activemq.apache.org/how-do-transactions-work.html).. Вы вызываете commit () в случае успеха, rollback () в случае сбоя или если сессия закрывается до вызова commit (), тогда произойдет повторная доставка (http://activemq.apache.org/message-redelivery-and-dlq-handling.html).

...