Я пытаюсь установить максимальное количество потребителей конечной точки темы с помощью jms, используя solace в качестве посредника, поэтому для увеличения нагрузки можно запустить несколько экземпляров приложения в cloudfoundry, и несколько подписчиков могут использовать сообщения одной и той же темы. .
Я перепробовал несколько комбинаций из приведенных ниже настроек (setConcurrency(), setConcurrentConsumers(), setMaxConcurrentConsumers()
, (20 - произвольное большое число). Судя по документации, мне определенно нужно использовать setMaxConcurrentConsumers()
и установить для него соответственно высокое значение.
Когда я развертываю приложение, создается конечная точка темы, но когда я смотрю на интерфейс управления утешением, максимальное число потребителей всегда равно 1 (как можно увидеть здесь: Queues -> Topic Endpoints -> select endpoint -> Configured Limit
), даже если оно должно быть 20 Таким образом, второй потребитель не может подключиться. Я не хочу устанавливать это вручную каждый раз, когда я развертываю приложение.
import javax.jms.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
@Configuration
public class ProducerConfiguration {
private static final Log logger = LogFactory.getLog(SolaceController.class);
@Value("${durable_subscription}")
private String subscriptionName;
@Value("${topic_name}")
private String topic_name;
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public JmsTemplate jmsTemplate() {
CachingConnectionFactory ccf = new CachingConnectionFactory(connectionFactory);
JmsTemplate jmst = new JmsTemplate(ccf);
jmst.setPubSubDomain(true);
return jmst;
}
@Bean
public Session configureSession(ConnectionFactory connectionFactory) throws JMSException {
return connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
}
private TextMessage lastReceivedMessage;
public class SimpleMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
lastReceivedMessage = (TextMessage) message;
try {
logger.info("Received message : " + lastReceivedMessage.getText());
} catch (JMSException e) {
logger.error("Error getting text of the received TextMessage: " + e);
}
} else {
logger.error("Received message that was not a TextMessage: " + message);
}
}
}
@Bean
public DefaultMessageListenerContainer orderMessageListenerContainer() {
DefaultMessageListenerContainer lc = new DefaultMessageListenerContainer();
lc.setConnectionFactory(connectionFactory);
lc.setDestinationName(topic_name);
lc.setMessageListener(new SimpleMessageListener());
lc.setDurableSubscriptionName(subscriptionName);
lc.setPubSubDomain(true);
//tried multiple combinations here, also setting only setMaxConcurrentConsumers
lc.setConcurrency("2-20");
lc.setConcurrentConsumers(20);
lc.setMaxConcurrentConsumers(20);
lc.setSubscriptionDurable(true);
lc.initialize();
lc.start();
return lc;
}
}