_AMQ_GROUP_ID присутствует в сообщении, но JMSXGroupID нулевой в @JmsListener - PullRequest
0 голосов
/ 15 октября 2019

Из этой документации:

Сообщения в группе сообщений имеют один и тот же идентификатор группы, т. Е. Имеют одинаковое свойство идентификатора группы (JMSXGroupID для JMS, _AMQ_GROUP_ID для Apache ActiveMQ Artemis Core API).

Я понимаю, почему свойство, изначально установленное с помощью JMSXGroupID, становится _AMQ_GROUP_ID, когда я просматриваю сообщения в брокере со значением product = paper. Тем не менее, в моем @JmsListener аннотированном методе я вижу, что свойство _AMQ_GROUP_ID отсутствует, а JMSXGroupID отображается как ноль в headers хеш-карте сообщения.

@JmsListener(destination = "${artemis.destination}", subscription = "${artemis.subscriptionName}",
            containerFactory = "containerFactory", concurrency = "15-15")
public void consumeMessage(Message<StatefulSpineEvent<?>> eventMessage)

Итак

  1. Приложение My Producer отправляет сообщение в очередь после установки свойства строки JMSXGroupID для 'product = paper'
  2. Я вижу, что _AMQ_GROUP_ID имеет значение 'product = paper'когда я просматриваю заголовки этого сообщения в интерфейсе пользователя Artemis
  3. Когда я отлаживаю свое приложение слушателя и смотрю на карту заголовков, _AMQ_GROUP_ID отсутствует и JMSXGroupID имеет значение null вместо 'product = paper'.

Является ли символ' = 'недействительным или есть что-то еще, что может вызвать это? У меня заканчиваются попытки попробовать.

Редактировать с новым кодом:

HeaderMapper:

@Component
public class GroupIdMessageMapper extends SimpleJmsHeaderMapper {

    @Override
    public MessageHeaders toHeaders(Message jmsMessage) {

        MessageHeaders messageHeaders = super.toHeaders(jmsMessage);

        Map<String, Object> messageHeadersMap = new HashMap<>(messageHeaders);

        try {
            messageHeadersMap.put("JMSXGroupID", jmsMessage.getStringProperty("_AMQ_GROUP_ID"));
        } catch (JMSException e) {
            e.printStackTrace();
        }

        // can see while debugging that this returns the correct headers
        return new MessageHeaders(messageHeadersMap);
    }
}

Слушатель:

@Component
public class CustomSpringJmsListener {

    protected final Logger LOG = LoggerFactory.getLogger(getClass());

    @JmsListener(destination = "local-queue", subscription = "groupid-example",
            containerFactory = "myContainerFactory", concurrency = "15-15")
    public void receive(Message message) throws JMSException {
        LOG.info("Received message: " + message);
    }
}

Код приложения:

@SpringBootApplication
@EnableJms
public class GroupidApplication implements CommandLineRunner {

    private static Logger LOG = LoggerFactory
            .getLogger(GroupidApplication.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired MessageConverter messageConverter;

    public static void main(String[] args) {
        LOG.info("STARTING THE APPLICATION");
        SpringApplication.run(GroupidApplication.class, args);

        LOG.info("APPLICATION FINISHED");
    }

    @Override
    public void run(String... args) {
        LOG.info("EXECUTING : command line runner");

        jmsTemplate.setPubSubDomain(true);

        createAndSendObjectMessage("Message1");
        createAndSendTextMessage("Message2");
        createAndSendTextMessage("Message3");
        createAndSendTextMessage("Message4");
        createAndSendTextMessage("Message5");
        createAndSendTextMessage("Message6");
    }

    private void createAndSendTextMessage(String messageBody) {
        jmsTemplate.send("local-queue", session -> {
            Message message = session.createTextMessage(messageBody);

            message.setStringProperty("JMSXGroupID", "product=paper");

            return message;
        });
    }

    // BEANS

    @Bean
    public JmsListenerContainerFactory<?> myContainerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        // This provides all boot's default to this factory, including the message converter
        configurer.configure(factory, connectionFactory);
        // You could still override some of Boot's default if necessary.
        factory.setSubscriptionDurable(true);
        factory.setSubscriptionShared(true);
        factory.setMessageConverter(messagingMessageConverter());

        return factory;
    }

    @Bean
    public MessagingMessageConverter messagingMessageConverter() {
        return new MessagingMessageConverter(messageConverter, new GroupIdMessageMapper());
    }
}

Трассировка стека, где вызывается SimpleJmsHeaderMapper:

toHeaders: 130, SimpleJmsHeaderMapper (org.springframework.jms.support) toHeaders: 57, SimpleJmsHeaderMapper (org.springframework.jms.support) extractHeaders: 148, MessagingMessageConverter (org.springframework.jms.support.converter) имеют доступ к $ 100: 466, AbstractAdaptableMessageListener $ MessagingMessageConverterAdapter (org.springfraessageAPMapageMasterageHerver.Exter.Exter. ПеременнаяolveArgument: 117, HandlerMethodArgumentResolverComposite (org.springframework.messaging.handler.invocation) getMethodArgumentValues: 148, InvocableHandlerMethod (org.springframework. 114, MessagingMessageListenerAdapter (org.springframework.jms.listener.adapter) onMessage: 77, MessagingMessageListenerAdapter (org.springframework. org.springframework.jms.listener) doExecuteListener: 674, AbstractMessageListenerContainer (org.springframework.jms.listener) doReceiveAndExecute: 318, AbstractPollingMessageListenerContainer (org.springframework.jling. invokeListener: 1190, DefaultMessageListenerContainer $AsyncMessageListenerInvoker (org.springframework.jms.listener) executeOngoingLoop: 1180, DefaultMessageListenerContainer $ AsyncMessageListenerInvoker (org.springframework.jms.listener) run: 1077, DefaultMessageListenerInsergej. lang)

1 Ответ

1 голос
/ 15 октября 2019

Попробуйте создать подкласс SimpleJmsHeaderMapper и переопределить toHeaders(). Звоните super.toHeaders(), создайте новый Map<> из результата;put() любые дополнительные заголовки, которые вы хотите добавить на карту, и возвратите новый MessageHeaders с карты.

Передайте пользовательский преобразователь в новый MessagingMessageConverter и передайте его в фабрику контейнеров.

Если вы используете Spring Boot, просто добавьте конвертер как @Bean, и загрузчик автоматически подключит его к фабрике.

EDIT

После всего этого;Я только что написал приложение, и оно прекрасно работает безо всяких настроек ...

@SpringBootApplication
public class So58399905Application {

    public static void main(String[] args) {
        SpringApplication.run(So58399905Application.class, args);
    }

    @JmsListener(destination = "foo")
    public void listen(String in, MessageHeaders headers) {
        System.out.println(in + headers);
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> template.convertAndSend("foo", "bar", msg -> {
            msg.setStringProperty("JMSXGroupID", "product=x");
            return msg;
        });
    }

}

и

bar{jms_redelivered=false, JMSXGroupID=product=x, jms_deliveryMode=2, JMSXDeliveryCount=1,  ...

EDIT2

Это ошибка в клиенте artemis - с 2.6.4 (Boot 2.1.9) только getStringProperty() возвращает значение свойства _AMQ_GROUP_ID при получении JMSXGroupID.

Картограф использует getObjectProperty(), который возвратил ноль. С клиентом 2.10.1;сообщение правильно возвращает значение свойства _AMQ_GROUP_ID из getObjectProperty().

...