Память для воспроизведения / синхронных сообщений не освобождена для сообщений, отправленных производителями в очередь в SpringBoot JMS с ActiveMQ - PullRequest
0 голосов
/ 21 июня 2020

1. Контекст:

Двухмодульное / микросервисное приложение, разработанное с помощью SpringBoot 2.3.0 и ActiveMQ. Также мы используем сервер / брокер ActiveMQ 5.15.13. Брокер определяется в обоих модулях со свойствами приложения. Кроме того, пул соединений с брокером определен в обоих модулях, а также в свойствах приложения и добавлен в оба модуля зависимости артефактов pooled-jms (с maven):

spring.activemq.broker-url=xxx
spring.activemq.user=xxx
spring.activemq.password=xx
spring.activemq.non-blocking-redelivery=true
spring.activemq.pool.enabled=true
spring.activemq.pool.time-between-expiration-check=5s
spring.activemq.pool.max-connections=10
spring.activemq.pool.max-sessions-per-connection=10
spring.activemq.pool.idle-timeout=60s

Другие конфигурации для JMS, которые я сделал:

spring.jms.listener.acknowledge-mode=auto
spring.jms.listener.auto-startup=true
spring.jms.listener.concurrency=5
spring.jms.listener.max-concurrency=10
spring.jms.pub-sub-domain=false
spring.jms.template.priority=100
spring.jms.template.qos-enabled=true
spring.jms.template.delivery-mode=persistent

В модуле 1 JmsTemplate используется для отправки синхронных сообщений (или мы также можем назвать replay-messages). Я выбрал правильную очередь вместо временной, так как понимаю, что если отправлено много сообщений, то временную очередь не рекомендуется использовать для повторов - вот что я сделал.

2. Примеры кода:

МОДУЛЬ 1:

@Value("${app.request-video.jms.queue.name}")
private String requestVideoQueueNameAppProperty;
@Bean
public Queue requestVideoJmsQueue() {
    logger.info("Initializing requestVideoJmsQueue using application property value for " +
            "app.request-video.jms.queue.name=" + requestVideoQueueNameAppProperty);
    return new ActiveMQQueue(requestVideoQueueNameAppProperty);
}

@Value("${app.request-video-replay.jms.queue.name}")
private String requestVideoReplayQueueNameAppProperty;
@Bean
public Queue requestVideoReplayJmsQueue() {
    logger.info("Initializing requestVideoReplayJmsQueue using application property value for " +
            "app.request-video-replay.jms.queue.name=" + requestVideoReplayQueueNameAppProperty);
    return new ActiveMQQueue(requestVideoReplayQueueNameAppProperty);
}

@Autowired
private JmsTemplate jmsTemplate;

public Message callSendAndReceive(TextJMSMessageDTO messageDTO, Destination jmsDestination, Destination jmsReplay) {

return jmsTemplate.sendAndReceive(jmsDestination, jmsSession -> {
        try {
            TextMessage textMessage = jmsSession.createTextMessage();
            textMessage.setText(messageDTO.getText());
            textMessage.setJMSReplyTo(jmsReplay);
            textMessage.setJMSCorrelationID(UUID.randomUUID().toString());
            textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
            return textMessage;
        } catch (IOException e) {
            logger.error("Error sending JMS message to destination: " + jmsDestination, e);
            throw new JMSException("Error sending JMS message to destination: " + jmsDestination);
        }
    });
}

МОДУЛЬ 2:

@JmsListener(destination = "${app.backend-get-request-video.jms.queue.name}")

public void onBackendGetRequestsVideoMessage(TextMessage message, Session session) throws JMSException, IOException {
    logger.info("Get requests video file message consumed!");
    try {
        Object replayObject = handleReplayAction(message);
        JMSMessageDTO messageDTO = messageDTOFactory.getJMSMessageDTO(replayObject);
        Message replayMessage = messageFactory.getJMSMessage(messageDTO, session);

        BytesMessage replayBytesMessage = jmsSession.createBytesMessage();
        fillByteMessageFromMediaDTO(replayBytesMessage, mediaMessageDTO);
        replayBytesMessage.setJMSCorrelationID(message.getJMSCorrelationID());
        final MessageProducer producer = session.createProducer(message.getJMSReplyTo());
        producer.send(replayBytesMessage);
        JmsUtils.closeMessageProducer(producer);
    } catch (JMSException | IOException e) {
        logger.error("onBackendGetRequestsVideoMessage()JMSException: " + e.getMessage(), e);
        throw e;
    }
}

private void fillByteMessageFromMediaDTO(BytesMessage bytesMessage, MediaJMSMessageDTO mediaMessageDTO)
        throws IOException, JMSException {
    String filePath = fileStorageConfiguration.getMediaFilePath(mediaMessageDTO);
    FileInputStream fileInputStream = null;
    try (FileInputStream fileInputStream = new FileInputStream(filePath)) {
        byte[] byteBuffer = new byte[1024];
        int bytes_read = 0;
        while ((bytes_read = fileInputStream.read(byteBuffer)) != -1) {
            bytesMessage.writeBytes(byteBuffer, 0, bytes_read);
        }
    } catch (JMSException e) {
        logger.error("Can not write data in JMS ByteMessage from file: " + fileName, e);
    } catch (FileNotFoundException e) {
        logger.error("Can not open stream to file: " + fileName, e);
    } catch (IOException e) {
        logger.error("Can not read data from file: " + fileName, e);
    }
}

3. Проблема:

Поскольку я отправляю много сообщений и получаю много соответствующих повторов через Producer / comsumer / JmsTamplate, оба прикладных модуля 1 и 2 быстро заполняют выделенную память кучи до тех пор, пока не возникнет ошибка нехватки памяти выбрасывается, но утечка памяти появляется только при использовании синхронных сообщений с воспроизведением, как показано выше.

Я отладил свой код, и все экземпляры (сеанс, производители, потребители, jmsTamplate, et c) объединены в пул и иметь экземпляры нужных классов из библиотеки pooled-jms; так что пул, по-видимому, должен работать правильно. Я сделал дамп кучи второго модуля, и похоже, что сообщения производителей (ActiveMQBytesMessage) все еще находятся в памяти даже спустя долгое время после того, как они были успешно использованы правильным потребителем.

У меня также есть асинхронные сообщения, отправленные в мои модули и швы, что эти сообщения производитель-потребитель работает хорошо; проблема присутствует только для сообщений синхронизации / воспроизведения производитель-потребитель.

Примеры файлов дампа кучи - взятые после полной ночи бездействия приложения - как показано ниже:

  1. модуль 1 module_1_dump
  2. module 2 module_2_dump
  3. брокер / сервер activemq activemq_dump

У кого-нибудь есть идеи что я делаю не так ?!

...