У меня есть сценарий потребителя ActiveMQ, работающий на Java, где я вызываю consumer.receive()
в цикле while(true)
.
Мне нужно ввести тайм-аут для каждого обработанного сообщения (например, если процесс обработки сообщений превышает 15 секунд, я должен получить следующее).
Я дал клиенту режим подтверждения для ACK.
Пожалуйста, посмотрите на метод consumeMessage
, в котором я реализовал потребление.
Желаемый результат:
Через 15 секунд первое сообщение должно быть отброшено (то есть оно не должно вызывать acknowledge()
). Вместо этого необходимо обработать следующее сообщение.
//package consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
public class ActivmqConsumer implements ExceptionListener {
ActiveMQConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
public ActivmqConsumer() throws Exception{
String USERNAME = "admin";
String PASSWORD = "admin";
this.connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, "tcp://192.168.56.101:61616?jms.prefetchPolicy.all=1");
// Create a Connection
this.connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(this);
// Create a Session
this.session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
}
public void consumeMessage(String destinationName, EventProcesser eventprocess){
Destination destination = null;
MessageConsumer consumer = null;
try{
// Create the destination (Topic or Queue)
destination = session.createQueue(destinationName);
// Create a MessageConsumer from the Session to the Topic or Queue
consumer = session.createConsumer(destination);
// Wait for a message
while(true){
Message message = consumer.receive(2);
if(message==null){
continue;
}
else if(message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
eventprocess.processEvent(text);
message.acknowledge();
} else{
System.out.println("Received: " + message);
}
}
}catch(Exception ex){
ex.printStackTrace();
}finally{
try{
consumer.close();
}catch(Exception ex){
ex.printStackTrace();
}
}
}
}