Ответ, полученный после истечения времени ожидания при использовании нескольких объектов RabbitTemplate - PullRequest
0 голосов
/ 12 июня 2018

У меня есть приложение Java Spring Boot 1.5.10, с которым я подключаюсь к двум различным серверам RabbitMQ.Один сервер RabbitMQ работает на том же хосте, что и приложение Spring Boot, а другой - на другом / удаленном хосте.Эта версия Spring Boot включает в себя org.springframework.amqp: spring-amqp: jar: 1.7.6.RELEASE, кстати.Итак, вот мой код конфигурации, который относится к локальному серверу RabbitMQ:

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory factory = new CachingConnectionFactory(host);
    factory.setVirtualHost(vhost);
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setChannelCacheSize(2);

    // Add a custom client connection property, which will show up in the Admin UI (useful for troubleshooting).
    factory.getRabbitConnectionFactory().getClientProperties().put("Connection Type", "Local");

    return factory;
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory,
        MessageConverter jackson2JsonMessageConverter) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
    rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
    return rabbitTemplate;
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory rabbitConnectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory);
    return factory;
}

@Bean
public RabbitAdmin admin(ConnectionFactory rabbitConnectionFactory) {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
    rabbitAdmin.afterPropertiesSet();
    rabbitAdmin.setAutoStartup(false);
    return rabbitAdmin;
}

А вот код моей конфигурации для сервера RabbitMQ, работающего удаленно на другом хосте:

@Bean
public ConnectionFactory remoteConnectionFactory() {
    CachingConnectionFactory factory = new CachingConnectionFactory(remoteHost);
    factory.setVirtualHost(remoteVhost);
    factory.setUsername(remoteUsername);
    factory.setPassword(remotePassword);

    // By default, only one Channel will be cached, with further requested Channels being created and disposed on demand.
    // Consider raising the "channelCacheSize" value in case of a high-concurrency environment.
    factory.setChannelCacheSize(3);

    factory.setConnectionThreadFactory(new CustomizableThreadFactory("RemoteRabbit-"));

    // Add a custom client connection property, which will show up in the Admin UI (useful for troubleshooting).
    factory.getRabbitConnectionFactory().getClientProperties().put("Connection Type", "Remote");

    return factory;
}

@Bean
public RabbitAdmin remoteAdmin(ConnectionFactory remoteConnectionFactory) {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(remoteConnectionFactory);
    rabbitAdmin.setIgnoreDeclarationExceptions(true);
    return rabbitAdmin;
}

@Bean
public SimpleRabbitListenerContainerFactory remoteContainerFactory(ConnectionFactory remoteConnectionFactory,
        MessageConverter jackson2JsonMessageConverter) {

    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(remoteConnectionFactory);
    factory.setConcurrentConsumers(1);
    factory.setMessageConverter(jackson2JsonMessageConverter);
    factory.setMaxConcurrentConsumers(5);
    factory.setDefaultRequeueRejected(false); // on error, don't put back in the queue
    return factory;
}

Теперь самое интересное.У меня есть другой RabbitTemplate, который я вызываю convertSendAndReceive (), для которого настроено remoteConnectionFactory.

@Bean
public RabbitTemplate payTemplate(ConnectionFactory remoteConnectionFactory,
        TopicExchange fromExchange, MessageConverter jackson2JsonMessageConverter) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(remoteConnectionFactory);
    rabbitTemplate.setReplyAddress(fromExchange.getName() + "/" + buildMyBindingKey());
    rabbitTemplate.setReplyTimeout(30000L);
    rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
    return rabbitTemplate;
}

@Bean
public SimpleMessageListenerContainer payReplyContainer(ConnectionFactory remoteConnectionFactory,
        RabbitTemplate payTemplate, Queue fromQueue) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(remoteConnectionFactory);
    container.setQueues(fromQueue);
    container.setMessageListener(payTemplate);
    return container;
}

@Bean
public TopicExchange fromExchange(RabbitAdmin remoteAdmin) {
    TopicExchange exchange = new TopicExchange("some.from.exchange", true, false);
    exchange.setAdminsThatShouldDeclare(remoteAdmin);
    return exchange;
}

@Bean
public Queue fromQueue(RabbitAdmin remoteAdmin) {
    Queue queue = new Queue("myReplyQueue");
    queue.setAdminsThatShouldDeclare(corporateAdmin);
    return queue;
}

private String buildMyBindingKey() {
    return "someBindingKeyHere";
}

Моя проблема возникает, когда я вызываю convertSendAndReceive () для payTemplate.Ответ получен в порядке, но почти кажется, что он был получен дважды.Это работало, когда я подключался только к одному серверу RabbitMQ, но после настройки подключений к двум серверам я получаю следующее:

2018-06-11 18:29:57,156|WARN||payReplyContainer-1|org.springframework.amqp.rabbit.core.RabbitTemplate|||||Reply received after timeout for 1

2018-06-11 18:29:57,165|WARN||payReplyContainer-1|org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler|||||Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:949)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:902)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:790)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:105)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:208)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1349)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:760)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1292)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1262)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1800(SimpleMessageListenerContainer.java:105)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1518)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
        at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:1759)
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:899)
        ... 10 common frames omitted

Опять-таки, payTemplate получает ответ / ответ, которого он ожидал, но егокак контейнер получил другое сообщение, которого никто не ждал.Я в тупике.Любая помощь приветствуется.

1 Ответ

0 голосов
/ 14 июня 2018

Действительно, именно обмен (дополнительная привязка, которого там не должно было быть) был ответственен за дублирование ответа.Большое спасибо Гэри за то, что он просмотрел мой код и предположительно не увидел ничего плохого и предложил кое-что посмотреть.

...