Spring jmsListener для прослушивания нескольких очередей - PullRequest
0 голосов
/ 06 апреля 2019

В этом посте Гарри Рассел объяснил, как программно создать несколько KafkaListeners для прослушивания из нескольких тем .. [эта настройка на самом деле успешно работает для меня] Kafka Spring: как создавать слушатели динамически или в цикле?

Теперь я хочу, чтобы аналогичная установка работала и для JMSListener - где у меня может быть один класс с одним @JMSListener в нем, и я могу программно создавать несколько экземпляров этого JMSListener, каждый из которых вводится со своим собственным queueName.

Я нашел это сообщение Spring JMS начинает прослушивать очереди jms по запросу

В конце этого поста Гэри сделал аналогичный комментарий,

Если вы хотите динамически создавать много контейнеров, просто создайте контейнеры программно, вызовите afterPropertiesSet (), затем start ()

Я использовал настройку, с которой работал в первом посте выше (связанном с KafkaListeners), мои множественные экземпляры JMS-слушателей запускаются, но не принимают никаких сообщений.

По сути, я не понял, где я могу это сделать

, затем просто создайте контейнеры программно, вызовите afterPropertiesSet (), затем start ()

Я запутался со словом - контейнер, я знаю, что есть JMSListener, и есть JmsListenerContainerFactory, что такое контейнер в этом контексте - я думаю, JMSListener?

Я подтвердил, что в очереди есть сообщения. также, когда я не создаю слушателей программным путем и просто имею одного слушателя с жестко закодированной очередью, упомянутой на нем, он отлично использует сообщение.

По сути, ни один из слушателей не использует сообщения, когда я создаю несколько слушателей JMS программно

    @SpringBootApplication
@EnableJms
public class MqProdConsumerApplication {
    private static Logger logger = LogManager.getLogger(MqProdConsumerApplication.class.getName());
    private static Consumers consumersStatic;

    @Autowired
    Consumers consumers;

    @PostConstruct
    public void init() {
        consumersStatic = this.consumers;
    }

    @Bean
    public Gson gson() {
        return new Gson();
    }

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(MqProdConsumerApplication.class, args);
        List<QueueInformation> queueInformationList = consumersStatic.getQueueInformationList();
        Assert.notEmpty(queueInformationList, "queueInformationList cannot be empty");
        logger.debug("queueInformationList ************" + queueInformationList.toString());
        for (QueueInformation queueInformation : queueInformationList) {
            AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
            child.setParent(context);
            child.register(MQConfig.class);
            Properties props = new Properties();
            props.setProperty("mqQueueName", queueInformation.getMqQueueName());
            //
            PropertiesPropertySource pps = new PropertiesPropertySource("listenerProps", props);
            child.getEnvironment().getPropertySources().addLast(pps);
            child.refresh();
        }
    }
}

Вот MQConfig, у которого есть listenerContainerFactory

@Configuration
public class MQConfig {
    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${ibm.mq.user}")
    private String mqUserName;

    @Bean
    public MQListener listener() {
        return new MQListener();
    }

    @PostConstruct
    public void afterConstruct() {
        logger.debug("************* initialized MQ Config successfully for user =" + mqUserName);
    }

    @Bean
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
                                                    DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);

        // Put the MQ username in the PCF environment.
        // Otherwise, the connection is identified by PCF's default user, "VCAP"
        System.setProperty("user.name", mqUserName);
        return factory;
    }
}

Затем прибывает MQListener, который имеет фактический @JMSListener

    public class MQListener {
    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${mqQueueName}")
    private String mqQueueName;

    @PostConstruct
    public void afteConstruct() {
        logger.debug("************* initialized MQ Listener successfully, will read from =" + mqQueueName);

    }

    @JmsListener(destination = "${mqQueueName}", containerFactory = "myFactory")
    public void receiveMessage(String receivedMessage) throws JAXBException, ExecutionException, InterruptedException {
        logger.debug("***********************************************receivedMessage:" + receivedMessage);
    }
}

Вот мое application.yml

    ibm.mq.queueManager: ABCTOD01
ibm.mq.channel: QMD00.SERVER
ibm.mq.connName: mqdv1.devfg.ABC.com
ibm.mq.user: pmd0app1
ibm.mq.password:
consumers:
  queueInformationList:
  -
    mqQueueName: QMD00.D.SRF.PERSON.LITE.PHONE.LOAD
  -
    mqQueueName: QMD00.D.SRF.PERSON.PHONE.LOAD

1 Ответ

0 голосов
/ 06 апреля 2019

хорошо, я нашел другое сообщение, где Гэри ответил на то, что я искал Добавление динамического количества слушателей (Spring JMS)

По сути, вот рабочее решение для меня. Отличная работа @GaryRussell - я теперь фанат:)

@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
    List<QueueInformation> queueInformationList = consumersStatic.getQueueInformationList();
    int i = 0;
    for (QueueInformation queueInformation :
            queueInformationList) {
        SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
        endpoint.setId("myJmsEndpoint-" + i++);
        endpoint.setDestination(queueInformation.getMqQueueName());
        endpoint.setMessageListener(message -> {
            logger.debug("***********************************************receivedMessage:" + message);
        });
        registrar.registerEndpoint(endpoint);
        logger.debug("registered the endpoint for queue" + queueInformation.getMqQueueName());
    }

}
...