Даже если сообщение получено MessageListener, я не хочу удалять его из очереди, я хочу выполнить некоторую обработку в методе onMessage и на основе результата:
Я хочу commit();
для успеха - поэтому сообщение будет полностью удалено из очереди.
Для сбоев - не фиксировать - rollback();
, поэтому сообщение будет доставлено (по умолчанию несколько раз) и затем отправлено в очередь недоставленных сообщений (DLQ).Это нормально для нас.
Я использую: SpringBoot и hornetq (spring-boot-starter-hornetq-1.4.7.RELEASE).Настройки:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jndi.JndiObjectFactoryBean;
import org.springframework.jndi.JndiTemplate;
import javax.jms.ConnectionFactory;
import javax.naming.Context;
import javax.naming.NamingException;
import java.util.Properties;
import static com.test.hornetq.Receiver.LOG;
import static javax.jms.Session.SESSION_TRANSACTED;
@Configuration
public class JmsConfig {
private String host;
private String port;
private String connectionFactoryJndiName;
private String jndiInit;
private String user;
private String password;
private String jmsReceiverConcurrency;
public JmsConfig(final Environment env) {
host = env.getProperty("host");
port = env.getProperty("port");
connectionFactoryJndiName = env.getProperty("connectionfactory.jndiname");
jndiInit = env.getProperty("jndiInit");
user = env.getProperty("user");
password = env.getProperty("password");
jmsReceiverConcurrency = env.getProperty("jmsReceiverConcurrency");
}
@Bean
public JndiTemplate jndiTemplate() {
final JndiTemplate jndiTemplate = new JndiTemplate();
jndiTemplate.setEnvironment(getProperties());
return jndiTemplate;
}
@Bean
public JndiObjectFactoryBean jmsConnectionFactory() throws NamingException {
final JndiObjectFactoryBean jndiObjectFactoryBean = new JndiObjectFactoryBean();
jndiObjectFactoryBean.setJndiTemplate(jndiTemplate());
jndiObjectFactoryBean.setJndiName(connectionFactoryJndiName);
jndiObjectFactoryBean.afterPropertiesSet();
return jndiObjectFactoryBean;
}
@Bean
@Primary
public ConnectionFactory connectionFactory() throws NamingException {
final UserCredentialsConnectionFactoryAdapter adapter = new UserCredentialsConnectionFactoryAdapter();
adapter.setTargetConnectionFactory((ConnectionFactory) jmsConnectionFactory().getObject());
adapter.setUsername(user);
adapter.setPassword(password);
return adapter;
}
@Bean
public JmsListenerContainerFactory<?> myJmsContainerFactory() throws NamingException {
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setSubscriptionDurable(false);
factory.setConcurrency(jmsReceiverConcurrency);
factory.setMaxMessagesPerTask(1);
factory.setSessionTransacted(true);
factory.setSessionAcknowledgeMode(SESSION_TRANSACTED);
factory.setErrorHandler(t -> {
LOG.error("Error in listener!", t);
});
return factory;
}
private Properties getProperties() {
final Properties jndiProps = new Properties();
jndiProps.setProperty(Context.INITIAL_CONTEXT_FACTORY, jndiInit);
jndiProps.setProperty(Context.PROVIDER_URL, "http-remoting://" + host + ":" + port);
jndiProps.setProperty(Context.SECURITY_PRINCIPAL, user);
jndiProps.setProperty(Context.SECURITY_CREDENTIALS, password);
return jndiProps;
}
}
И получатель:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Session;
@Component
public class Receiver {
@JmsListener(destination = "${destination.name}", containerFactory = "myJmsContainerFactory")
public void onReceive(final MapMessage message, Session session) throws JMSException {
try {
System.out.println(">>>> " + message);
session.rollback();
} catch (Exception ex) {
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>THROW ");
throw ex;
}
}
}
Но когда я делаю rollback();
, ничего не происходит, и сообщение не возвращается.