У нас есть проблема, похожая на этот старый вопрос . Наша установка, однако, немного отличается. Например, пульс должен уже присутствовать, так как у нас есть InactivityMonitor по умолчанию из ActiveMQ.
У нас есть клиент, который использует встроенный брокер. Встроенный посредник имеет сетевой соединитель с удаленным посредником, работающим как автономный сервис на компьютере. Таким образом, мы можем отделить связь между клиентом и сервером. Встроенный брокер служит локальной очередью для клиента.
Клиент отправляет сообщения встроенному брокеру. Эти сообщения либо передаются через сетевой соединитель на удаленный посредник, либо (когда соединение временно недоступно) остаются во встроенном посреднике до тех пор, пока соединение не будет восстановлено.
И встроенный посредник, и удаленный посредник являются экземплярами Apache ActiveMQ. Реализация JMS основана на Spring JMS.
На практике мы иногда видим странное поведение (обычно после длительного периода без каких-либо проблем):
- сетевой разъем указан на вкладке соединений на консоли управления удаленного посредника. Однако не все сообщения доставляются удаленному посреднику. Обычно байтовые сообщения застряли во встроенном посреднике, а текстовые сообщения доставляются в очереди на удаленном посреднике.
- сетевой разъем указан на вкладке подключений консоли управления удаленного брокера. Однако на удаленном посреднике сообщения не доставляются.
Монитор неактивности включен на удаленном посреднике. Встроенный брокер создается с использованием кода, показанного ниже (код SSL для краткости опущен).
Мы понятия не имеем, что может вызвать проблему, и, что более важно, почему установка не восстанавливается автоматически. Мы ожидали, что эта настройка автоматически обнаружит, что соединение недоступно, и настроит новое соединение. Похоже, этого не происходит.
У кого-нибудь есть идея, с которой мы могли бы начать искать, или, что еще лучше, есть идея, в чем может быть проблема?
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory(final Optional<TransportListener> transportListener)
{
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setClientIDPrefix(clientIDPrefix);
activeMQConnectionFactory.setBrokerURL(this.env.getProperty("activemq.connection.url"));
activeMQConnectionFactory.setUserName(this.env.getProperty("activemq.users.username"));
activeMQConnectionFactory.setPassword(this.env.getProperty("activemq.users.password"));
transportListener.ifPresent(activeMQConnectionFactory::setTransportListener);
return activeMQConnectionFactory;
}
@Bean
public CachingConnectionFactory cachingConnectionFactory(final ActiveMQConnectionFactory activeMQConnectionFactory)
{
return new CachingConnectionFactory(activeMQConnectionFactory);
}
@Bean
public JmsTemplate jmsTemplate(final CachingConnectionFactory cachingConnectionFactory)
{
final JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
return jmsTemplate;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(final ActiveMQConnectionFactory activeMQConnectionFactory)
{
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory);
factory.setConcurrency("3-10");
return factory;
}
@Bean
public MessageSender messageSender()
{
return new MessageSender();
}
@Bean(initMethod = "start", destroyMethod = "stop")
public BrokerService bufferingBroker() throws Exception
{
final BrokerService broker = new BrokerService();
String brokerName = UUID.randomUUID().toString();
broker.setBrokerName(brokerName);
final NetworkConnector networkConnector = new DiscoveryNetworkConnector(
new URI("static://" + this.env.getProperty("activemq.connection.url")));
networkConnector.setUserName(this.env.getProperty("activemq.users.username"));
networkConnector.setPassword(this.env.getProperty("activemq.users.password"));
networkConnector.setNetworkTTL(5);
broker.addNetworkConnector(networkConnector);
return broker;
}
@Bean
public ActiveMQConnectionFactory embeddedActiveMQConnectionFactory(final BrokerService bufferingBroker) throws Exception
{
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL("vm://" + bufferingBroker.getBrokerName() + "?create=false");
return activeMQConnectionFactory;
}
@Bean
public CachingConnectionFactory embeddedCachingConnectionFactory(final ActiveMQConnectionFactory embeddedActiveMQConnectionFactory) throws Exception
{
return new CachingConnectionFactory(embeddedActiveMQConnectionFactory);
}
@Bean
public JmsTemplate embeddedJmsTemplate(final CachingConnectionFactory embeddedCachingConnectionFactory) throws Exception
{
return new JmsTemplate(embeddedCachingConnectionFactory);
}
@Bean
public DefaultJmsListenerContainerFactory embeddedJmsListenerContainerFactory(final ActiveMQConnectionFactory embeddedActiveMQConnectionFactory) throws Exception
{
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(embeddedActiveMQConnectionFactory);
factory.setConcurrency("3-10");
return factory;
}
@Bean
public BufferedMessageSender bufferedMessageSender()
{
return new BufferedMessageSender();
}