Попытка освободить соединение, чтобы избежать максимальной ошибки соединения клиента - PullRequest
0 голосов
/ 06 августа 2020

Я пытаюсь найти решение для следующего сообщения об ошибке, не увеличивая размер соединения по умолчанию с 1000 до 2000 или более.

Недавно я столкнулся со следующей ошибкой, когда около 1000 сообщений были отправлены на брокера с задержкой в ​​5 минут, как показано в приведенном ниже коде.

WARN  | Could not accept connection  : Exceeded the maximum number of allowed client connections. See the 'maximumConnections' property on the TCP transport configuration URI in the ActiveMQ configuration file (e.g., activemq.xml) | org.apache.activemq.broker.TransportConnector | ActiveMQ Transport Server Thread Handler: tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600

Ниже приведен код, который постоянно слушает ActiveMQ, и как только он видит COMPLETE, он отправляет электронное письмо на пользователь после создания файла. В противном случае он входит в блок else и снова отправляет сообщение брокеру.

Внутри блока else я хочу протестировать, закрыв соединение после того, как я закончу отправку сообщение. Итак, я закрыл соединение внутри блока finally, как показано ниже. Это правильный способ go об этом?

@Component
public class DownloadConsumer {
    
    @Autowired
    private JavaMailSender javaMailSender;
    
    // one instance, reuse
    private final CloseableHttpClient httpClient = HttpClients.createDefault();
    
    Connection connection;
            
    
    // Working Code with JMS 2.0
    @JmsListener(destination = "MessageProducerJMSV1")
        public void processBrokerQueues(String message) throws DaoException, JMSException {
            
        
         try {
            
            RequestDao requestDao = (RequestDao) context.getBean("requestDao");
            
            String receivedStatus = requestDao.getRequestStatus(message);
            
            
             
            //Retrieve Username from the message to include in an email
             String[] parts = message.split("#");
             String userName = parts[1].trim();
             
            //Retrieve personnelID from the message to include in the webservice calls
            
             String personnelID = parts[3].trim();
            
            
            
            
            //Before sending this message, do the check for COMPLETE or ERROR etc
            if(receivedStatus.equals("COMPLETE")) {
                
                
                
                String latestUUID = requestDao.getUUID();
                
                logger.info("Received UUID in Controller is as follows! ");
                logger.info(latestUUID);
                
                requestDao.sendMessage(message,latestUUID);
                logger.info("Received status is COMPLETE! ");
                logger.info("Sending email to the user! ");
                String emailMessage = "Dear "+userName+",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
                String recipientEmail = userName+"@organization.com";
                
                
                
                
                /*****************************************************\
                // START: EMAIL Related Code
                
                 *******************************************************/
                
                MimeMessage msg = javaMailSender.createMimeMessage();
                 MimeMessageHelper helper = new MimeMessageHelper(msg, true);
                 helper.setFrom("ABCResearch@organization.com");
                 helper.setTo(recipientEmail);
                 helper.setSubject("Requested Files !");
                 helper.setText(emailMessage,true);
                 
                 javaMailSender.send(msg);
                 
                
                    
                                
            }
            else {
                
                
                // Getting JMS connection from the server and starting it
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
                connection = connectionFactory.createConnection();
                connection.start();
                
                Session session = connection.createSession(false,
                        Session.AUTO_ACKNOWLEDGE);
                
                // Destination represents here our queue 'MessageProducerJMSV1' on the  JMS server
                Destination destination = session.createQueue(subject);
                
                
                MessageProducer producer = session.createProducer(destination);
                
                //Sending message to the queue
                TextMessage toSendMessage = session.createTextMessage(message);
                
                long delay = 300 * 1000;
                long period =300 * 1000;
                
                toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
                
                producer.send(toSendMessage);
                
                
                
                
            }
            
            }
            catch(Throwable th){
                th.printStackTrace();   
                
            }
            finally {
            
                connection.close();
                
            }
            
         }
   // URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static String subject = "MessageProducerJMSV1"; //Queue Name
    // default broker URL is : tcp://localhost:61616"
    
    private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
    private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);
    
    

}

Ответы [ 2 ]

2 голосов
/ 07 августа 2020

Причина, по которой вы получаете сообщение «Превышено максимальное количество разрешенных клиентских подключений», заключается в том, что вы создаете подключения, а не закрываете их. Другими словами, ваше приложение «пропускает» соединения. Для устранения утечки необходимо закрыть соединение. Закрытие JMS-соединения в блоке finally является общепринятой практикой, поэтому ваш код там хорошо выглядит. Однако вам необходимо проверить наличие null на случай возникновения проблемы до того, как connection будет фактически создан, например:

finally {
    if (connection != null) {
        connection.close();
    }                
}

Тем не менее, стоит отметить, что создание и закрытие JMS-соединения & сеанс и производитель для отправки сообщения single - это хорошо известный антишаблон. Было бы лучше, если бы вы кэшировали соединение (например, в переменной static) и использовали его повторно. Например:

@Component
public class DownloadConsumer {

   @Autowired
   private JavaMailSender javaMailSender;

   // one instance, reuse
   private final CloseableHttpClient httpClient = HttpClients.createDefault();

   private static Connection connection;

   private static Object connectionLock = new Object();

   // URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost
   private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
   private static String subject = "MessageProducerJMSV1"; //Queue Name
   // default broker URL is : tcp://localhost:61616"

   private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
   private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);

   // Working Code with JMS 2.0
   @JmsListener(destination = "MessageProducerJMSV1")
   public void processBrokerQueues(String message) throws DaoException, JMSException {

      try {

         RequestDao requestDao = (RequestDao) context.getBean("requestDao");

         String receivedStatus = requestDao.getRequestStatus(message);

         //Retrieve Username from the message to include in an email
         String[] parts = message.split("#");
         String userName = parts[1].trim();

         //Retrieve personnelID from the message to include in the webservice calls

         String personnelID = parts[3].trim();

         //Before sending this message, do the check for COMPLETE or ERROR etc
         if (receivedStatus.equals("COMPLETE")) {

            String latestUUID = requestDao.getUUID();

            logger.info("Received UUID in Controller is as follows! ");
            logger.info(latestUUID);

            requestDao.sendMessage(message, latestUUID);
            logger.info("Received status is COMPLETE! ");
            logger.info("Sending email to the user! ");
            String emailMessage = "Dear " + userName + ",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
            String recipientEmail = userName + "@organization.com";

            /*****************************************************\
             // START: EMAIL Related Code
             *******************************************************/

            MimeMessage msg = javaMailSender.createMimeMessage();
            MimeMessageHelper helper = new MimeMessageHelper(msg, true);
            helper.setFrom("ABCResearch@organization.com");
            helper.setTo(recipientEmail);
            helper.setSubject("Requested Files !");
            helper.setText(emailMessage, true);

            javaMailSender.send(msg);

         } else {
            try {
               createConnection();

               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

               // Destination represents here our queue 'MessageProducerJMSV1' on the  JMS server
               Destination destination = session.createQueue(subject);

               MessageProducer producer = session.createProducer(destination);

               //Sending message to the queue
               TextMessage toSendMessage = session.createTextMessage(message);

               long delay = 300 * 1000;
               long period = 300 * 1000;

               toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);

               producer.send(toSendMessage);
            } catch (Throwable th) {
               th.printStackTrace();
               
               synchronized (connectionLock) {
                  // if there are any problems close the connection and it will be re-created next time
                  if (connection != null) {
                     connection.close();
                     connection = null;
                  }
               }
            }
         }
      } catch (Throwable th) {
         th.printStackTrace();
      }
   }
   
   private void createConnection() {
      synchronized (connectionLock) {
         if (connection == null) {
            // Getting JMS connection from the server
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            connection = connectionFactory.createConnection();
         }
      }
   }
}

Обратите внимание, что в этом коде нет блока finally для закрытия соединения. Это сделано намеренно, потому что весь смысл этого кода - держать соединение открытым , чтобы он не открывал и не закрывал соединение для отправки одного сообщения. Соединение повторно используется между вызовами. Единственный раз, когда соединение закрывается, - это Throwable.

Также имейте в виду, что нет причин вызывать start() в соединении JMS, если оно просто отправляет сообщение. Метод start() влияет только на потребителей .

0 голосов
/ 06 августа 2020

Проблема заключается в приведенном ниже коде, который будет открывать новое соединение при каждом сообщении - в идеале вы должны сделать это в идеале, чтобы вызывать это один раз (и снова, если соединение истекает).

// Getting JMS connection from the server and starting it
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();

получить сеанс из него и закрыть сессия. В зависимости от использования вы даже можете поддерживать сеанс дольше.

...