Spring AMQP: добавление обработчиков сообщений к автоматически настроенному `RabbitTemplate` - PullRequest
0 голосов
/ 15 мая 2019

Я пытаюсь дозвониться до RabbitTemplate#addBeforePublishPostProcessors и RabbitTemplate#addAfterReceivePostProcessors без особых проблем с автоконфигурацией Spring.

Я пытаюсь это сделать, но мой MessagePostProcessor не запускается (я не вижу 'test_header' в сообщении, которое публикуется).

  @EventListener
  void test(ApplicationPreparedEvent event) {
    ConfigurableApplicationContext applicationContext = event.getApplicationContext();
    RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
    rabbitTemplate.addBeforePublishPostProcessors(new MessagePostProcessor() {
      @Override
      public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setHeader("test_header", "test_header_value");
        return message;
      }
    });
  }

Какое место лучше всего для этой цели?

Я тоже пытался прослушать ApplicationStartedEvent.

Обновление:

Добавил этот боб в мой @Configuration класс согласно рекомендации Гэри:

  @Bean
  public static BeanPostProcessor rabbitTemplatePostProcessor() {
    return new BeanPostProcessor() {
      @Override
      public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if(bean instanceof RabbitTemplate) {
          RabbitTemplate rabbitTemplate = (RabbitTemplate) bean;

          rabbitTemplate.addBeforePublishPostProcessors(m -> {
            m.getMessageProperties()
              .setHeader(MESSAGE_PUBLISHED_TIME, currentTimeMillis());
            return m;
          });

          rabbitTemplate.addAfterReceivePostProcessors(m -> {
            m.getMessageProperties().setHeader(MESSAGE_RECEIVED_TIME, currentTimeMillis());
            return m;
          });
        }
        return bean;
      }
    };
  }

Для всех, кто ищет ответ о том, как это сделать, если вы используете @RabbitListener и @SendTo, пожалуйста, см. Редактирование Гэри его ответа.

1 Ответ

1 голос
/ 15 мая 2019

Используйте BeanPostProcessor.

@SpringBootApplication
public class So56155062Application {

    public static void main(String[] args) {
        SpringApplication.run(So56155062Application.class, args);
    }

    @Bean
    public static BeanPostProcessor bpp() {
        return new BeanPostProcessor() {

            @Override
            public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
                if (bean instanceof RabbitTemplate) {
                    ((RabbitTemplate) bean).setBeforePublishPostProcessors(m -> {
                        m.getMessageProperties().setHeader("foo", "baz");
                        return m;
                    });
                }
                return bean;
            }

        };

    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> template.convertAndSend("foo", "bar");
    }

    @RabbitListener(queues = "foo")
    public void listen(String in, @Header("foo") String header) {
        System.out.println(in + header);
    }

}

Обратите внимание на модификатор static

РЕДАКТИРОВАТЬ

шаблон не используется для ответов;вместо этого постпроцессоры идут на фабрику контейнеров.

@SpringBootApplication
public class So56155062Application {

    public static void main(String[] args) {
        SpringApplication.run(So56155062Application.class, args);
    }

    @Bean
    public static BeanPostProcessor bpp() {
        return new BeanPostProcessor() {

            @Override
            public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
                if (bean instanceof RabbitTemplate) {
                    ((RabbitTemplate) bean).setBeforePublishPostProcessors(m -> {
                        m.getMessageProperties().setHeader("foo", "baz");
                        m.getMessageProperties().setReplyTo("bar");
                        System.out.println("Adding header to outgoing message with payload: " + new String(m.getBody()));
                        return m;
                    });
                }
                else if (bean instanceof AbstractRabbitListenerContainerFactory) {
                    ((AbstractRabbitListenerContainerFactory<?>) bean).setAfterReceivePostProcessors(m -> {
                        m.getMessageProperties().setHeader("qux", "fiz");
                        System.out.println("Adding header to incoming message with payload: " + new String(m.getBody()));
                        return m;
                    });
                    ((AbstractRabbitListenerContainerFactory<?>) bean).setBeforeSendReplyPostProcessors(m -> {
                        m.getMessageProperties().setHeader("foo", "baz");
                        m.getMessageProperties().setReplyTo("bar");
                        System.out.println(
                                "Adding header to outgoing reply message with payload: " + new String(m.getBody()));
                        return m;
                    });
                }
                return bean;
            }

        };

    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> template.convertAndSend("foo", "bar");
    }

    @RabbitListener(queues = "foo")
    @SendTo
    public String listen1(String in, @Header("foo") String header) {
        System.out.println(in + header);
        return in.toUpperCase();
    }

    @RabbitListener(queues = "bar")
    public void listen2(String in) {
        System.out.println(in);
    }

}

и

Adding header to outgoing message with payload: bar
Adding header to incoming message with payload: bar
barbaz
Adding header to outgoing reply message with payload: BAR
Adding header to incoming message with payload: BAR
BAR
...