Я использую AyncRabbitTemplate для публикации сообщений.При указании неверного (несуществующего) имени очереди во время публикации - оно отбрасывает сообщение без вывода сообщений.
Я попытался включить «подтверждение» и «мандат» на AyncRabbitTemplate и добавил необходимые методы обратного вызова, как показано ниже:
@Bean
AsyncRabbitTemplate template() {
RabbitTemplate rabbit = rabbitTemplate();
rabbit.setChannelTransacted(true); //to throw error when channel shuts down in case of incorrect exchange names
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbit, rpcReplyMessageListenerContainer(connectionFactory()));
asyncRabbitTemplate.setEnableConfirms(true);
asyncRabbitTemplate.setMandatory(true); //if the message cannot be delivered to a queue an AmqpMessageReturnedException will be thrown
return asyncRabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer rpcReplyMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.setQueueNames(Constants.REPLY_QUEUE);
simpleMessageListenerContainer.setTaskExecutor(Executors.newCachedThreadPool());
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
return simpleMessageListenerContainer;
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
return connectionFactory;
}
И методы обратного вызова как:
RabbitConverterFuture<String> future = this.asyncRabbitTemplate.convertSendAndReceive("",Constants.SNS_QUEUE, "This is the request message ",new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) {
message.getMessageProperties().setTimestamp(new Date());
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
}
});
ListenableFuture<Boolean> future2 = future.getConfirm();
future2.addCallback(new ListenableFutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
System.out.println("Publish Result " + result);
}
@Override
public void onFailure(Throwable ex) {
System.out.println("Publish Failed: " + ex);
}
});