Установить максимальное количество потребителей JMS утешение - PullRequest
0 голосов
/ 05 апреля 2019

Я пытаюсь установить максимальное количество потребителей конечной точки темы с помощью jms, используя solace в качестве посредника, поэтому для увеличения нагрузки можно запустить несколько экземпляров приложения в cloudfoundry, и несколько подписчиков могут использовать сообщения одной и той же темы. .

Я перепробовал несколько комбинаций из приведенных ниже настроек (setConcurrency(), setConcurrentConsumers(), setMaxConcurrentConsumers(), (20 - произвольное большое число). Судя по документации, мне определенно нужно использовать setMaxConcurrentConsumers() и установить для него соответственно высокое значение.

Когда я развертываю приложение, создается конечная точка темы, но когда я смотрю на интерфейс управления утешением, максимальное число потребителей всегда равно 1 (как можно увидеть здесь: Queues -> Topic Endpoints -> select endpoint -> Configured Limit), даже если оно должно быть 20 Таким образом, второй потребитель не может подключиться. Я не хочу устанавливать это вручную каждый раз, когда я развертываю приложение.

Screenshot of my solace management interface

import javax.jms.*;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

@Configuration
public class ProducerConfiguration {

    private static final Log logger = LogFactory.getLog(SolaceController.class);

    @Value("${durable_subscription}")
    private String subscriptionName;

    @Value("${topic_name}")
    private String topic_name;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public JmsTemplate jmsTemplate() {
        CachingConnectionFactory ccf = new CachingConnectionFactory(connectionFactory);
        JmsTemplate jmst = new JmsTemplate(ccf);
        jmst.setPubSubDomain(true);
        return jmst;
    }

    @Bean
    public Session configureSession(ConnectionFactory connectionFactory) throws JMSException {
        return connectionFactory.createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
    }


    private TextMessage lastReceivedMessage;

    public class SimpleMessageListener implements MessageListener {
        @Override
        public void onMessage(Message message) {

            if (message instanceof TextMessage) {
                lastReceivedMessage = (TextMessage) message;
                try {
                    logger.info("Received message : " + lastReceivedMessage.getText());
                } catch (JMSException e) {
                    logger.error("Error getting text of the received TextMessage: " + e);
                }
            } else {
                logger.error("Received message that was not a TextMessage: " + message);
            }
        }
    }

    @Bean
    public DefaultMessageListenerContainer orderMessageListenerContainer() {

        DefaultMessageListenerContainer lc = new DefaultMessageListenerContainer();
        lc.setConnectionFactory(connectionFactory);
        lc.setDestinationName(topic_name);
        lc.setMessageListener(new SimpleMessageListener());
        lc.setDurableSubscriptionName(subscriptionName);
        lc.setPubSubDomain(true);

        //tried multiple combinations here, also setting only setMaxConcurrentConsumers
        lc.setConcurrency("2-20");
        lc.setConcurrentConsumers(20);
        lc.setMaxConcurrentConsumers(20);

        lc.setSubscriptionDurable(true);
        lc.initialize();
        lc.start();
        return lc;
    }
}

Ответы [ 2 ]

1 голос
/ 08 апреля 2019

Вам необходимо создать неисключительную очередь / конечную точку.

По умолчанию создаваемые вами очереди являются исключительными очередями / конечными точками, что означает, что только один подписчик может связываться с ним в любое время.

Самый простой способ создать такую ​​очередь / конечную точку - через CLI Solace.

Чтобы создать неисключительную очередь в вашей JMS-программе, вам нужно перейти к реализации JMS для Solace, например:

    if (queueName != null) {
        EndpointProperties props = new EndpointProperties();
        props.setAccessType(EndpointProperties.ACCESSTYPE_NONEXCLUSIVE);

        try {
            ((SolConnection)connection).getProperties().getJCSMPSession()
                .provision(JCSMPFactory.onlyInstance().createQueue(queueName), props, 0L);
        } catch (Exception e) {
            e.printStackTrace();
        }

        queue = session.createQueue(queueName);
    }
1 голос
/ 05 апреля 2019

Я думаю, что для вашего случая использования ваш потребитель застрял в очередях. Смотри https://solace.com/blog/topic-subscription-queues/

"... в то время как несколько потребителей могут связываться с очередями Долговечные конечные точки ограничены подпиской на одну тему. Очереди допускают как подписку на несколько тем, так и символы подстановки. "

Если вы не хотите менять своего издателя, попробуйте «Подписка на темы в очередях». То есть очередь может быть настроена на прослушивание темы. И тогда ваши потребители будут получать сообщения из этой очереди.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...