У меня есть приложение Java Spring Boot 1.5.10, с которым я подключаюсь к двум различным серверам RabbitMQ.Один сервер RabbitMQ работает на том же хосте, что и приложение Spring Boot, а другой - на другом / удаленном хосте.Эта версия Spring Boot включает в себя org.springframework.amqp: spring-amqp: jar: 1.7.6.RELEASE, кстати.Итак, вот мой код конфигурации, который относится к локальному серверу RabbitMQ:
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory(host);
factory.setVirtualHost(vhost);
factory.setUsername(username);
factory.setPassword(password);
factory.setChannelCacheSize(2);
// Add a custom client connection property, which will show up in the Admin UI (useful for troubleshooting).
factory.getRabbitConnectionFactory().getClientProperties().put("Connection Type", "Local");
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory,
MessageConverter jackson2JsonMessageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
return rabbitTemplate;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory rabbitConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory);
return factory;
}
@Bean
public RabbitAdmin admin(ConnectionFactory rabbitConnectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
rabbitAdmin.afterPropertiesSet();
rabbitAdmin.setAutoStartup(false);
return rabbitAdmin;
}
А вот код моей конфигурации для сервера RabbitMQ, работающего удаленно на другом хосте:
@Bean
public ConnectionFactory remoteConnectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory(remoteHost);
factory.setVirtualHost(remoteVhost);
factory.setUsername(remoteUsername);
factory.setPassword(remotePassword);
// By default, only one Channel will be cached, with further requested Channels being created and disposed on demand.
// Consider raising the "channelCacheSize" value in case of a high-concurrency environment.
factory.setChannelCacheSize(3);
factory.setConnectionThreadFactory(new CustomizableThreadFactory("RemoteRabbit-"));
// Add a custom client connection property, which will show up in the Admin UI (useful for troubleshooting).
factory.getRabbitConnectionFactory().getClientProperties().put("Connection Type", "Remote");
return factory;
}
@Bean
public RabbitAdmin remoteAdmin(ConnectionFactory remoteConnectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(remoteConnectionFactory);
rabbitAdmin.setIgnoreDeclarationExceptions(true);
return rabbitAdmin;
}
@Bean
public SimpleRabbitListenerContainerFactory remoteContainerFactory(ConnectionFactory remoteConnectionFactory,
MessageConverter jackson2JsonMessageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(remoteConnectionFactory);
factory.setConcurrentConsumers(1);
factory.setMessageConverter(jackson2JsonMessageConverter);
factory.setMaxConcurrentConsumers(5);
factory.setDefaultRequeueRejected(false); // on error, don't put back in the queue
return factory;
}
Теперь самое интересное.У меня есть другой RabbitTemplate, который я вызываю convertSendAndReceive (), для которого настроено remoteConnectionFactory.
@Bean
public RabbitTemplate payTemplate(ConnectionFactory remoteConnectionFactory,
TopicExchange fromExchange, MessageConverter jackson2JsonMessageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(remoteConnectionFactory);
rabbitTemplate.setReplyAddress(fromExchange.getName() + "/" + buildMyBindingKey());
rabbitTemplate.setReplyTimeout(30000L);
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer payReplyContainer(ConnectionFactory remoteConnectionFactory,
RabbitTemplate payTemplate, Queue fromQueue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(remoteConnectionFactory);
container.setQueues(fromQueue);
container.setMessageListener(payTemplate);
return container;
}
@Bean
public TopicExchange fromExchange(RabbitAdmin remoteAdmin) {
TopicExchange exchange = new TopicExchange("some.from.exchange", true, false);
exchange.setAdminsThatShouldDeclare(remoteAdmin);
return exchange;
}
@Bean
public Queue fromQueue(RabbitAdmin remoteAdmin) {
Queue queue = new Queue("myReplyQueue");
queue.setAdminsThatShouldDeclare(corporateAdmin);
return queue;
}
private String buildMyBindingKey() {
return "someBindingKeyHere";
}
Моя проблема возникает, когда я вызываю convertSendAndReceive () для payTemplate.Ответ получен в порядке, но почти кажется, что он был получен дважды.Это работало, когда я подключался только к одному серверу RabbitMQ, но после настройки подключений к двум серверам я получаю следующее:
2018-06-11 18:29:57,156|WARN||payReplyContainer-1|org.springframework.amqp.rabbit.core.RabbitTemplate|||||Reply received after timeout for 1
2018-06-11 18:29:57,165|WARN||payReplyContainer-1|org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler|||||Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:949)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:902)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:790)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:105)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:208)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1349)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:760)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1292)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1262)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1800(SimpleMessageListenerContainer.java:105)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1518)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:1759)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:899)
... 10 common frames omitted
Опять-таки, payTemplate получает ответ / ответ, которого он ожидал, но егокак контейнер получил другое сообщение, которого никто не ждал.Я в тупике.Любая помощь приветствуется.