Я изменил свой RabbitMQ из предыдущего поста (десериализация spring-rabbit JSON для содержимого ArrayList ), чтобы теперь использовать DirectMessageListener с MessagePostProcessors для GZip и GUnzip полезных нагрузок сообщений.
Однако, похоже, он не работает, поскольку точки останова не активированы, но также потому, что мои RabbitListener
s больше не получают сообщения, тогда как они сделали с SimpleMessageFactoryListenerContainer
.
Кроме того, кажется, что SimpleMessageListenerContainer (?) Все еще используется. Кстати, я автоматически подключаю DirectMessageListenerContainer
, поэтому я могу динамически устанавливать очереди, которые я использовал.
весенний кролик: 2.0.3. РЕЛИЗ.
Пружинная загрузка: 2.0.1.RELEASE.
Конфигурация RabbitMQ:
@Configuration
@EnableRabbit
public class MessagingConfiguration implements ShutdownListener {
@Autowired
private RabbitListenerEndpointRegistry registry;
@Autowired
private DirectMessageListenerContainer container;
@Bean
public DirectMessageListenerContainer messageListenerContainer(final ConnectionFactory connectionFactory) {
final DirectMessageListenerContainer listenerContainer = new DirectMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setMessageConverter(jsonConverter()); // i.e.@RabbitListener to use Jackson2JsonMessageConverter
listenerContainer.setAutoStartup(false);
// container.setErrorHandler(errorHandler);
final MessageListenerAdapter messageListener = new MessageListenerAdapter(new Object() {
@SuppressWarnings("unused")
public String handleMessage(final String message) {
return message.toUpperCase();
}
});
messageListener.setBeforeSendReplyPostProcessors(new GZipPostProcessor());
listenerContainer.setMessageListener(messageListener);
listenerContainer.setAfterReceivePostProcessors(new GUnzipPostProcessor());
return listenerContainer;
}
@EventListener(ApplicationDatabaseReadyEvent.class)
public void onApplicationDatabaseReadyEvent() {
log.info("Starting all RabbitMQ Listeners..."); //$NON-NLS-1$
for (final MessageListenerContainer listenerContainer : registry.getListenerContainers()) {
listenerContainer.start();
}
log.info("Register is running: {}", registry.isRunning()); //$NON-NLS-1$
log.info("Started all RabbitMQ Listeners."); //$NON-NLS-1$
}
@Bean
public List<Declarable> bindings() {
final List<Declarable> declarations = new ArrayList<>();
final FanoutExchange exchange = new FanoutExchange("fx", true, false);
final Queue queue = QueueBuilder.durable("orders").build();
declarations.add(exchange);
declarations.add(queue);
declarations.add(BindingBuilder.bind(queue).to(exchange));
List<String> q = new ArrayList<>();
q.add(queue.getName());
container.addQueueNames(q.toArray(new String[queues.size()]));
return declarations;
}
@Bean
public Jackson2JsonMessageConverter jsonConverter() {
final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setClassMapper(classMapper());
return converter;
}
private static DefaultJackson2JavaTypeMapper classMapper() {
final DefaultJackson2JavaTypeMapper classMapper = new DefaultJackson2JavaTypeMapper();
classMapper.setTrustedPackages("*"); //$NON-NLS-1$ //TODO add trusted packages
return classMapper;
}
@ConditionalOnProperty(name = "consumer", havingValue = "true")
@Bean
public ConsumerListener listenerConsumer() {
return new ConsumerListener();
}
@ConditionalOnProperty(name = "producer", havingValue = "true")
@Bean
public ProducerListener listenerProducer() {
return new ProducerListener();
}
@Bean
public RabbitAdmin rabbitAdmin(final CachingConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonConverter()); // convert all sent messages to JSON
rabbitTemplate.setReplyTimeout(TimeUnit.SECONDS.toMillis(3));
rabbitTemplate.setReceiveTimeout(TimeUnit.SECONDS.toMillis(3));
return rabbitTemplate;
}
@Override
public void shutdownCompleted(final ShutdownSignalException arg0) {
}
}