1. Контекст:
Двухмодульное / микросервисное приложение, разработанное с помощью SpringBoot 2.3.0 и ActiveMQ. Также мы используем сервер / брокер ActiveMQ 5.15.13. Брокер определяется в обоих модулях со свойствами приложения. Кроме того, пул соединений с брокером определен в обоих модулях, а также в свойствах приложения и добавлен в оба модуля зависимости артефактов pooled-jms (с maven):
spring.activemq.broker-url=xxx
spring.activemq.user=xxx
spring.activemq.password=xx
spring.activemq.non-blocking-redelivery=true
spring.activemq.pool.enabled=true
spring.activemq.pool.time-between-expiration-check=5s
spring.activemq.pool.max-connections=10
spring.activemq.pool.max-sessions-per-connection=10
spring.activemq.pool.idle-timeout=60s
Другие конфигурации для JMS, которые я сделал:
spring.jms.listener.acknowledge-mode=auto
spring.jms.listener.auto-startup=true
spring.jms.listener.concurrency=5
spring.jms.listener.max-concurrency=10
spring.jms.pub-sub-domain=false
spring.jms.template.priority=100
spring.jms.template.qos-enabled=true
spring.jms.template.delivery-mode=persistent
В модуле 1 JmsTemplate используется для отправки синхронных сообщений (или мы также можем назвать replay-messages). Я выбрал правильную очередь вместо временной, так как понимаю, что если отправлено много сообщений, то временную очередь не рекомендуется использовать для повторов - вот что я сделал.
2. Примеры кода:
МОДУЛЬ 1:
@Value("${app.request-video.jms.queue.name}")
private String requestVideoQueueNameAppProperty;
@Bean
public Queue requestVideoJmsQueue() {
logger.info("Initializing requestVideoJmsQueue using application property value for " +
"app.request-video.jms.queue.name=" + requestVideoQueueNameAppProperty);
return new ActiveMQQueue(requestVideoQueueNameAppProperty);
}
@Value("${app.request-video-replay.jms.queue.name}")
private String requestVideoReplayQueueNameAppProperty;
@Bean
public Queue requestVideoReplayJmsQueue() {
logger.info("Initializing requestVideoReplayJmsQueue using application property value for " +
"app.request-video-replay.jms.queue.name=" + requestVideoReplayQueueNameAppProperty);
return new ActiveMQQueue(requestVideoReplayQueueNameAppProperty);
}
@Autowired
private JmsTemplate jmsTemplate;
public Message callSendAndReceive(TextJMSMessageDTO messageDTO, Destination jmsDestination, Destination jmsReplay) {
return jmsTemplate.sendAndReceive(jmsDestination, jmsSession -> {
try {
TextMessage textMessage = jmsSession.createTextMessage();
textMessage.setText(messageDTO.getText());
textMessage.setJMSReplyTo(jmsReplay);
textMessage.setJMSCorrelationID(UUID.randomUUID().toString());
textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
return textMessage;
} catch (IOException e) {
logger.error("Error sending JMS message to destination: " + jmsDestination, e);
throw new JMSException("Error sending JMS message to destination: " + jmsDestination);
}
});
}
МОДУЛЬ 2:
@JmsListener(destination = "${app.backend-get-request-video.jms.queue.name}")
public void onBackendGetRequestsVideoMessage(TextMessage message, Session session) throws JMSException, IOException {
logger.info("Get requests video file message consumed!");
try {
Object replayObject = handleReplayAction(message);
JMSMessageDTO messageDTO = messageDTOFactory.getJMSMessageDTO(replayObject);
Message replayMessage = messageFactory.getJMSMessage(messageDTO, session);
BytesMessage replayBytesMessage = jmsSession.createBytesMessage();
fillByteMessageFromMediaDTO(replayBytesMessage, mediaMessageDTO);
replayBytesMessage.setJMSCorrelationID(message.getJMSCorrelationID());
final MessageProducer producer = session.createProducer(message.getJMSReplyTo());
producer.send(replayBytesMessage);
JmsUtils.closeMessageProducer(producer);
} catch (JMSException | IOException e) {
logger.error("onBackendGetRequestsVideoMessage()JMSException: " + e.getMessage(), e);
throw e;
}
}
private void fillByteMessageFromMediaDTO(BytesMessage bytesMessage, MediaJMSMessageDTO mediaMessageDTO)
throws IOException, JMSException {
String filePath = fileStorageConfiguration.getMediaFilePath(mediaMessageDTO);
FileInputStream fileInputStream = null;
try (FileInputStream fileInputStream = new FileInputStream(filePath)) {
byte[] byteBuffer = new byte[1024];
int bytes_read = 0;
while ((bytes_read = fileInputStream.read(byteBuffer)) != -1) {
bytesMessage.writeBytes(byteBuffer, 0, bytes_read);
}
} catch (JMSException e) {
logger.error("Can not write data in JMS ByteMessage from file: " + fileName, e);
} catch (FileNotFoundException e) {
logger.error("Can not open stream to file: " + fileName, e);
} catch (IOException e) {
logger.error("Can not read data from file: " + fileName, e);
}
}
3. Проблема:
Поскольку я отправляю много сообщений и получаю много соответствующих повторов через Producer / comsumer / JmsTamplate, оба прикладных модуля 1 и 2 быстро заполняют выделенную память кучи до тех пор, пока не возникнет ошибка нехватки памяти выбрасывается, но утечка памяти появляется только при использовании синхронных сообщений с воспроизведением, как показано выше.
Я отладил свой код, и все экземпляры (сеанс, производители, потребители, jmsTamplate, et c) объединены в пул и иметь экземпляры нужных классов из библиотеки pooled-jms; так что пул, по-видимому, должен работать правильно. Я сделал дамп кучи второго модуля, и похоже, что сообщения производителей (ActiveMQBytesMessage) все еще находятся в памяти даже спустя долгое время после того, как они были успешно использованы правильным потребителем.
У меня также есть асинхронные сообщения, отправленные в мои модули и швы, что эти сообщения производитель-потребитель работает хорошо; проблема присутствует только для сообщений синхронизации / воспроизведения производитель-потребитель.
Примеры файлов дампа кучи - взятые после полной ночи бездействия приложения - как показано ниже:
- модуль 1 module_1_dump
- module 2 module_2_dump
- брокер / сервер activemq activemq_dump
У кого-нибудь есть идеи что я делаю не так ?!