У меня есть однопоточный потребитель ActiveMQ, написанный на Java.Все, что я пытаюсь сделать, это получить () сообщение из очереди, попытаться отправить его в веб-службу, и, если это удастся, подтвердить () его.Если вызов веб-службы завершится неудачно, я хочу, чтобы сообщение оставалось в очереди и повторно отправлялось через некоторое время.
Это более или менее работает, за исключением повторной отправки: каждый раз, когда я перезапускаю своего потребителя, оно получаетодно сообщение для каждого, которое все еще находится в очереди, но после неудачной отправки сообщения никогда не отправляются повторно.
Мой код выглядит так:
public boolean init() throws JMSException, FileNotFoundException, IOException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
connectionFactory.setRedeliveryPolicy(policy);
connectionFactory.setUseRetroactiveConsumer(true); // ????
Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
destination = session.createQueue(subject); //???
consumer = session.createConsumer(destination);
//consumer.setMessageListener(this); // message listener had same behaviour
}
private void process() {
while(true) {
System.out.println("Waiting...");
try {
Message message = consumer.receive();
onMessage(message);
} catch (JMSException e) {
e.printStackTrace();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void onMessage(Message message) {
System.out.println("onMessage");
messagesReceived++;
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
if(!client.sendMessage(msg)) {
System.out.println("Webservice call failed. Keeping message");
//message.
} else {
message.acknowledge();
}
if (transacted) {
if ((messagesReceived % batch) == 0) {
System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived);
session.commit();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
В настоящее время я не использую транзакции(может быть, мне следовало бы?).
Я уверен, что упускаю что-то легкое и скоро буду бить себя по лбу, но я не могу понять, как это должно работать.Спасибо!
РЕДАКТИРОВАТЬ: Не могу ответить на это сам, как недостаточно rep:
ОК, после еще нескольких экспериментов оказывается, что транзакции - единственный способ сделать это.Вот новый код:
public boolean init() throws JMSException, FileNotFoundException, IOException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(1000L);
policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);
connectionFactory.setRedeliveryPolicy(policy);
connectionFactory.setUseRetroactiveConsumer(true);
Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ActiveMQSession.CLIENT_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
@Override
public void onMessage(Message message) {
System.out.println("onMessage");
messagesReceived++;
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
if(client.sendMessage(msg)) {
if(transacted) {
System.out.println("Call succeeded - committing message");
session.commit();
}
//message.acknowledge();
} else {
if(transacted) {
System.out.println("Webservice call failed. Rolling back message");
session.rollback();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Теперь сообщение отправляется каждые 1000 мс, как указано в Политике повторной доставки.
Надеюсь, это поможет кому-то еще!:)