Из этой документации:
Сообщения в группе сообщений имеют один и тот же идентификатор группы, т. Е. Имеют одинаковое свойство идентификатора группы (JMSXGroupID для JMS, _AMQ_GROUP_ID для Apache ActiveMQ Artemis Core API).
Я понимаю, почему свойство, изначально установленное с помощью JMSXGroupID
, становится _AMQ_GROUP_ID
, когда я просматриваю сообщения в брокере со значением product = paper. Тем не менее, в моем @JmsListener
аннотированном методе я вижу, что свойство _AMQ_GROUP_ID
отсутствует, а JMSXGroupID
отображается как ноль в headers
хеш-карте сообщения.
@JmsListener(destination = "${artemis.destination}", subscription = "${artemis.subscriptionName}",
containerFactory = "containerFactory", concurrency = "15-15")
public void consumeMessage(Message<StatefulSpineEvent<?>> eventMessage)
Итак
- Приложение My Producer отправляет сообщение в очередь после установки свойства строки
JMSXGroupID
для 'product = paper' - Я вижу, что
_AMQ_GROUP_ID
имеет значение 'product = paper'когда я просматриваю заголовки этого сообщения в интерфейсе пользователя Artemis - Когда я отлаживаю свое приложение слушателя и смотрю на карту заголовков,
_AMQ_GROUP_ID
отсутствует и JMSXGroupID
имеет значение null вместо 'product = paper'.
Является ли символ' = 'недействительным или есть что-то еще, что может вызвать это? У меня заканчиваются попытки попробовать.
Редактировать с новым кодом:
HeaderMapper:
@Component
public class GroupIdMessageMapper extends SimpleJmsHeaderMapper {
@Override
public MessageHeaders toHeaders(Message jmsMessage) {
MessageHeaders messageHeaders = super.toHeaders(jmsMessage);
Map<String, Object> messageHeadersMap = new HashMap<>(messageHeaders);
try {
messageHeadersMap.put("JMSXGroupID", jmsMessage.getStringProperty("_AMQ_GROUP_ID"));
} catch (JMSException e) {
e.printStackTrace();
}
// can see while debugging that this returns the correct headers
return new MessageHeaders(messageHeadersMap);
}
}
Слушатель:
@Component
public class CustomSpringJmsListener {
protected final Logger LOG = LoggerFactory.getLogger(getClass());
@JmsListener(destination = "local-queue", subscription = "groupid-example",
containerFactory = "myContainerFactory", concurrency = "15-15")
public void receive(Message message) throws JMSException {
LOG.info("Received message: " + message);
}
}
Код приложения:
@SpringBootApplication
@EnableJms
public class GroupidApplication implements CommandLineRunner {
private static Logger LOG = LoggerFactory
.getLogger(GroupidApplication.class);
@Autowired
private JmsTemplate jmsTemplate;
@Autowired MessageConverter messageConverter;
public static void main(String[] args) {
LOG.info("STARTING THE APPLICATION");
SpringApplication.run(GroupidApplication.class, args);
LOG.info("APPLICATION FINISHED");
}
@Override
public void run(String... args) {
LOG.info("EXECUTING : command line runner");
jmsTemplate.setPubSubDomain(true);
createAndSendObjectMessage("Message1");
createAndSendTextMessage("Message2");
createAndSendTextMessage("Message3");
createAndSendTextMessage("Message4");
createAndSendTextMessage("Message5");
createAndSendTextMessage("Message6");
}
private void createAndSendTextMessage(String messageBody) {
jmsTemplate.send("local-queue", session -> {
Message message = session.createTextMessage(messageBody);
message.setStringProperty("JMSXGroupID", "product=paper");
return message;
});
}
// BEANS
@Bean
public JmsListenerContainerFactory<?> myContainerFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
factory.setSubscriptionDurable(true);
factory.setSubscriptionShared(true);
factory.setMessageConverter(messagingMessageConverter());
return factory;
}
@Bean
public MessagingMessageConverter messagingMessageConverter() {
return new MessagingMessageConverter(messageConverter, new GroupIdMessageMapper());
}
}
Трассировка стека, где вызывается SimpleJmsHeaderMapper:
toHeaders: 130, SimpleJmsHeaderMapper (org.springframework.jms.support) toHeaders: 57, SimpleJmsHeaderMapper (org.springframework.jms.support) extractHeaders: 148, MessagingMessageConverter (org.springframework.jms.support.converter) имеют доступ к $ 100: 466, AbstractAdaptableMessageListener $ MessagingMessageConverterAdapter (org.springfraessageAPMapageMasterageHerver.Exter.Exter. ПеременнаяolveArgument: 117, HandlerMethodArgumentResolverComposite (org.springframework.messaging.handler.invocation) getMethodArgumentValues: 148, InvocableHandlerMethod (org.springframework. 114, MessagingMessageListenerAdapter (org.springframework.jms.listener.adapter) onMessage: 77, MessagingMessageListenerAdapter (org.springframework. org.springframework.jms.listener) doExecuteListener: 674, AbstractMessageListenerContainer (org.springframework.jms.listener) doReceiveAndExecute: 318, AbstractPollingMessageListenerContainer (org.springframework.jling. invokeListener: 1190, DefaultMessageListenerContainer $AsyncMessageListenerInvoker (org.springframework.jms.listener) executeOngoingLoop: 1180, DefaultMessageListenerContainer $ AsyncMessageListenerInvoker (org.springframework.jms.listener) run: 1077, DefaultMessageListenerInsergej. lang)