Не работает откат () в SessionAwareMessageListener - PullRequest
0 голосов
/ 31 мая 2018

Даже если сообщение получено MessageListener, я не хочу удалять его из очереди, я хочу выполнить некоторую обработку в методе onMessage и на основе результата:

Я хочу commit(); для успеха - поэтому сообщение будет полностью удалено из очереди.

Для сбоев - не фиксировать - rollback();, поэтому сообщение будет доставлено (по умолчанию несколько раз) и затем отправлено в очередь недоставленных сообщений (DLQ).Это нормально для нас.

Я использую: SpringBoot и hornetq (spring-boot-starter-hornetq-1.4.7.RELEASE).Настройки:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jndi.JndiObjectFactoryBean;
import org.springframework.jndi.JndiTemplate;

import javax.jms.ConnectionFactory;
import javax.naming.Context;
import javax.naming.NamingException;
import java.util.Properties;

import static com.test.hornetq.Receiver.LOG;
import static javax.jms.Session.SESSION_TRANSACTED;

@Configuration
public class JmsConfig {

    private String host;
    private String port;
    private String connectionFactoryJndiName;
    private String jndiInit;
    private String user;
    private String password;
    private String jmsReceiverConcurrency;

    public JmsConfig(final Environment env) {
        host = env.getProperty("host");
        port = env.getProperty("port");
        connectionFactoryJndiName = env.getProperty("connectionfactory.jndiname");
        jndiInit = env.getProperty("jndiInit");
        user = env.getProperty("user");
        password = env.getProperty("password");
        jmsReceiverConcurrency = env.getProperty("jmsReceiverConcurrency");
    }

    @Bean
    public JndiTemplate jndiTemplate() {
        final JndiTemplate jndiTemplate = new JndiTemplate();
        jndiTemplate.setEnvironment(getProperties());

        return jndiTemplate;
    }

    @Bean
    public JndiObjectFactoryBean jmsConnectionFactory() throws NamingException {
        final JndiObjectFactoryBean jndiObjectFactoryBean = new JndiObjectFactoryBean();
        jndiObjectFactoryBean.setJndiTemplate(jndiTemplate());
        jndiObjectFactoryBean.setJndiName(connectionFactoryJndiName);
        jndiObjectFactoryBean.afterPropertiesSet();

        return jndiObjectFactoryBean;
    }

    @Bean
    @Primary
    public ConnectionFactory connectionFactory() throws NamingException {
        final UserCredentialsConnectionFactoryAdapter adapter = new UserCredentialsConnectionFactoryAdapter();
        adapter.setTargetConnectionFactory((ConnectionFactory) jmsConnectionFactory().getObject());
        adapter.setUsername(user);
        adapter.setPassword(password);

        return adapter;
    }

    @Bean
    public JmsListenerContainerFactory<?> myJmsContainerFactory() throws NamingException {
        final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setSubscriptionDurable(false);
        factory.setConcurrency(jmsReceiverConcurrency);
        factory.setMaxMessagesPerTask(1);
        factory.setSessionTransacted(true);
        factory.setSessionAcknowledgeMode(SESSION_TRANSACTED);
        factory.setErrorHandler(t -> {
            LOG.error("Error in listener!", t);
        });


        return factory;
    }

    private Properties getProperties() {
        final Properties jndiProps = new Properties();
        jndiProps.setProperty(Context.INITIAL_CONTEXT_FACTORY, jndiInit);
        jndiProps.setProperty(Context.PROVIDER_URL, "http-remoting://" + host + ":" + port);
        jndiProps.setProperty(Context.SECURITY_PRINCIPAL, user);
        jndiProps.setProperty(Context.SECURITY_CREDENTIALS, password);
        return jndiProps;
    }

}

И получатель:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Session;

@Component
public class Receiver {

    @JmsListener(destination = "${destination.name}", containerFactory = "myJmsContainerFactory")
    public void onReceive(final MapMessage message, Session session) throws JMSException {
        try {
            System.out.println(">>>> " + message);
            session.rollback();
        } catch (Exception ex) {
            System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>THROW ");
            throw ex;
        }
    }
}

Но когда я делаю rollback();, ничего не происходит, и сообщение не возвращается.

1 Ответ

0 голосов
/ 11 июня 2018

код работает.Проблема была в настройках hornetq на стороне сервера.

<pre-acknowledge>true</pre-acknowledge>

Дополнительные режимы подтверждения

Обратите внимание, что если вы используете режим предварительного подтверждения, вы потеряете транзакциюсемантика для используемых сообщений, так как они сначала подтверждаются на сервере, а не при фиксации транзакции.Это может указывать на очевидное, но нам хотелось бы прояснить это, чтобы избежать путаницы!

...