Сжатие и распаковка сообщений Spring RabbitMQ с помощью DirectMessageListenerContainer - PullRequest
0 голосов
/ 07 мая 2018

Я изменил свой RabbitMQ из предыдущего поста (десериализация spring-rabbit JSON для содержимого ArrayList ), чтобы теперь использовать DirectMessageListener с MessagePostProcessors для GZip и GUnzip полезных нагрузок сообщений.

Однако, похоже, он не работает, поскольку точки останова не активированы, но также потому, что мои RabbitListener s больше не получают сообщения, тогда как они сделали с SimpleMessageFactoryListenerContainer. Кроме того, кажется, что SimpleMessageListenerContainer (?) Все еще используется. Кстати, я автоматически подключаю DirectMessageListenerContainer, поэтому я могу динамически устанавливать очереди, которые я использовал.


весенний кролик: 2.0.3. РЕЛИЗ. Пружинная загрузка: 2.0.1.RELEASE.


Конфигурация RabbitMQ:

@Configuration
@EnableRabbit
public class MessagingConfiguration implements ShutdownListener {

    @Autowired
    private RabbitListenerEndpointRegistry registry;

    @Autowired
    private DirectMessageListenerContainer container;

    @Bean
    public DirectMessageListenerContainer messageListenerContainer(final ConnectionFactory connectionFactory) {
        final DirectMessageListenerContainer listenerContainer = new DirectMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory);
        listenerContainer.setMessageConverter(jsonConverter()); // i.e.@RabbitListener to use Jackson2JsonMessageConverter
        listenerContainer.setAutoStartup(false);
        // container.setErrorHandler(errorHandler);
        final MessageListenerAdapter messageListener = new MessageListenerAdapter(new Object() {
            @SuppressWarnings("unused")
            public String handleMessage(final String message) {
                return message.toUpperCase();
            }
        });
        messageListener.setBeforeSendReplyPostProcessors(new GZipPostProcessor());
        listenerContainer.setMessageListener(messageListener);
        listenerContainer.setAfterReceivePostProcessors(new GUnzipPostProcessor());
        return listenerContainer;
    }

    @EventListener(ApplicationDatabaseReadyEvent.class)
    public void onApplicationDatabaseReadyEvent() {
        log.info("Starting all RabbitMQ Listeners..."); //$NON-NLS-1$
        for (final MessageListenerContainer listenerContainer : registry.getListenerContainers()) {
            listenerContainer.start();
        }
        log.info("Register is running: {}", registry.isRunning()); //$NON-NLS-1$
        log.info("Started all RabbitMQ Listeners."); //$NON-NLS-1$
    }

    @Bean
    public List<Declarable> bindings() {
    final List<Declarable> declarations = new ArrayList<>();
        final FanoutExchange exchange = new FanoutExchange("fx", true, false);
        final Queue queue = QueueBuilder.durable("orders").build();
        declarations.add(exchange);
        declarations.add(queue);
        declarations.add(BindingBuilder.bind(queue).to(exchange));
        List<String> q = new ArrayList<>();
        q.add(queue.getName());
    container.addQueueNames(q.toArray(new String[queues.size()]));

        return declarations;
    }

    @Bean
    public Jackson2JsonMessageConverter jsonConverter() {
        final Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        converter.setClassMapper(classMapper());
        return converter;
    }

    private static DefaultJackson2JavaTypeMapper classMapper() {
        final DefaultJackson2JavaTypeMapper classMapper = new DefaultJackson2JavaTypeMapper();
        classMapper.setTrustedPackages("*"); //$NON-NLS-1$  //TODO add trusted packages
        return classMapper;
    }

    @ConditionalOnProperty(name = "consumer", havingValue = "true")
    @Bean
    public ConsumerListener listenerConsumer() {
        return new ConsumerListener();
    }

    @ConditionalOnProperty(name = "producer", havingValue = "true")
    @Bean
    public ProducerListener listenerProducer() {
        return new ProducerListener();
    }

    @Bean
    public RabbitAdmin rabbitAdmin(final CachingConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonConverter()); // convert all sent messages to JSON
        rabbitTemplate.setReplyTimeout(TimeUnit.SECONDS.toMillis(3));
        rabbitTemplate.setReceiveTimeout(TimeUnit.SECONDS.toMillis(3));
        return rabbitTemplate;
    }

    @Override
    public void shutdownCompleted(final ShutdownSignalException arg0) {
    }
}

1 Ответ

0 голосов
/ 08 мая 2018

Это не работает, вы не можете автоматически соединять контейнеры в течение @RabbitListener с; они не бобы; они создаются контейнерной фабрикой и регистрируются в реестре. Вместо этого вы должны извлечь их из реестра (по идентификатору).

Однако, поскольку для autoStartup установлено значение false, он не должен «красть» сообщения от вашего @RabbitListener.

Как правило, регистрация отладки должна помочь.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...