В этом посте Гарри Рассел объяснил, как программно создать несколько KafkaListeners для прослушивания из нескольких тем .. [эта настройка на самом деле успешно работает для меня]
Kafka Spring: как создавать слушатели динамически или в цикле?
Теперь я хочу, чтобы аналогичная установка работала и для JMSListener - где у меня может быть один класс с одним @JMSListener в нем, и я могу программно создавать несколько экземпляров этого JMSListener, каждый из которых вводится со своим собственным queueName.
Я нашел это сообщение
Spring JMS начинает прослушивать очереди jms по запросу
В конце этого поста Гэри сделал аналогичный комментарий,
Если вы хотите динамически создавать много контейнеров, просто создайте контейнеры программно, вызовите afterPropertiesSet (), затем start ()
Я использовал настройку, с которой работал в первом посте выше (связанном с KafkaListeners), мои множественные экземпляры JMS-слушателей запускаются, но не принимают никаких сообщений.
По сути, я не понял, где я могу это сделать
, затем просто создайте контейнеры программно, вызовите afterPropertiesSet (), затем start ()
Я запутался со словом - контейнер, я знаю, что есть JMSListener, и есть
JmsListenerContainerFactory, что такое контейнер в этом контексте - я думаю, JMSListener?
Я подтвердил, что в очереди есть сообщения. также, когда я не создаю слушателей программным путем и просто имею одного слушателя с жестко закодированной очередью, упомянутой на нем, он отлично использует сообщение.
По сути, ни один из слушателей не использует сообщения, когда я создаю несколько слушателей JMS программно
@SpringBootApplication
@EnableJms
public class MqProdConsumerApplication {
private static Logger logger = LogManager.getLogger(MqProdConsumerApplication.class.getName());
private static Consumers consumersStatic;
@Autowired
Consumers consumers;
@PostConstruct
public void init() {
consumersStatic = this.consumers;
}
@Bean
public Gson gson() {
return new Gson();
}
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(MqProdConsumerApplication.class, args);
List<QueueInformation> queueInformationList = consumersStatic.getQueueInformationList();
Assert.notEmpty(queueInformationList, "queueInformationList cannot be empty");
logger.debug("queueInformationList ************" + queueInformationList.toString());
for (QueueInformation queueInformation : queueInformationList) {
AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
child.setParent(context);
child.register(MQConfig.class);
Properties props = new Properties();
props.setProperty("mqQueueName", queueInformation.getMqQueueName());
//
PropertiesPropertySource pps = new PropertiesPropertySource("listenerProps", props);
child.getEnvironment().getPropertySources().addLast(pps);
child.refresh();
}
}
}
Вот MQConfig, у которого есть listenerContainerFactory
@Configuration
public class MQConfig {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${ibm.mq.user}")
private String mqUserName;
@Bean
public MQListener listener() {
return new MQListener();
}
@PostConstruct
public void afterConstruct() {
logger.debug("************* initialized MQ Config successfully for user =" + mqUserName);
}
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
// Put the MQ username in the PCF environment.
// Otherwise, the connection is identified by PCF's default user, "VCAP"
System.setProperty("user.name", mqUserName);
return factory;
}
}
Затем прибывает MQListener, который имеет фактический @JMSListener
public class MQListener {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${mqQueueName}")
private String mqQueueName;
@PostConstruct
public void afteConstruct() {
logger.debug("************* initialized MQ Listener successfully, will read from =" + mqQueueName);
}
@JmsListener(destination = "${mqQueueName}", containerFactory = "myFactory")
public void receiveMessage(String receivedMessage) throws JAXBException, ExecutionException, InterruptedException {
logger.debug("***********************************************receivedMessage:" + receivedMessage);
}
}
Вот мое application.yml
ibm.mq.queueManager: ABCTOD01
ibm.mq.channel: QMD00.SERVER
ibm.mq.connName: mqdv1.devfg.ABC.com
ibm.mq.user: pmd0app1
ibm.mq.password:
consumers:
queueInformationList:
-
mqQueueName: QMD00.D.SRF.PERSON.LITE.PHONE.LOAD
-
mqQueueName: QMD00.D.SRF.PERSON.PHONE.LOAD