Spring-boot + SQS не восстанавливается после нестабильности AWS - PullRequest
0 голосов
/ 23 ноября 2018

В нашем приложении мы в настоящее время используем JmsListener + AWS SQS.Некоторое время он работает хорошо, но когда случается некоторая нестабильность AWS (например, Service Unavailable), его потребитель пытается восстановиться, выключившись и перезапустившись снова, но после успешного запуска больше не потребляет сообщение.

Stacktrace:

        2018-11-21 13:16:33.941  WARN 27862 --- [enerContainer-3] o.s.j.l.DefaultMessageListenerContainer  : Execution of JMS message listener failed, and no ErrorHandler has been set.

    javax.jms.JMSException: AmazonServiceException: deleteMessageBatch. RequestId: null
    HTTPStatusCode: 503 AmazonErrorCode: 503 Service Unavailable
        at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.handleException(AmazonSQSMessagingClientWrapper.java:397)
        at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.deleteMessageBatch(AmazonSQSMessagingClientWrapper.java:151)
        at com.amazon.sqs.javamessaging.acknowledge.RangedAcknowledger.action(RangedAcknowledger.java:145)
        at com.amazon.sqs.javamessaging.acknowledge.BulkSQSOperation.bulkAction(BulkSQSOperation.java:61)
        at com.amazon.sqs.javamessaging.acknowledge.RangedAcknowledger.acknowledge(RangedAcknowledger.java:70)
        at com.amazon.sqs.javamessaging.message.SQSMessage.acknowledge(SQSMessage.java:883)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.commitIfNecessary(AbstractMessageListenerContainer.java:763)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:663)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1166)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: com.amazonaws.services.sqs.model.AmazonSQSException: null (Service: AmazonSQS; Status Code: 503; Error Code: 503 Service Unavailable; Request ID: null)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1588)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1258)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
        at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:1557)
        at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:1533)
        at com.amazonaws.services.sqs.AmazonSQSClient.deleteMessageBatch(AmazonSQSClient.java:777)
        at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.deleteMessageBatch(AmazonSQSMessagingClientWrapper.java:149)
        ... 12 common frames omitted
    2018-11-21 13:16:36.371  INFO 27862 --- [enerContainer-3] com.amazon.sqs.javamessaging.SQSSession  : Shutting down SessionCallBackScheduler executor
    2018-11-21 13:16:36.371  INFO 27862 --- [enerContainer-7] c.a.s.javamessaging.SQSMessageConsumer   : Shutting down ConsumerPrefetch executor
    2018-11-21 13:16:36.371  WARN 27862 --- [enerContainer-3] o.s.j.l.DefaultMessageListenerContainer  : Setup of JMS message listener invoker failed for destination 'QUEUE_NAME' - trying to recover. Cause: AmazonServiceException: deleteMessageBatch. RequestId: null
    HTTPStatusCode: 503 AmazonErrorCode: 503 Service Unavailable
...
    2018-11-21 13:16:36.951  INFO 27862 --- [enerContainer-7] com.amazon.sqs.javamessaging.SQSSession  : Shutting down SessionCallBackScheduler executor
    2018-11-21 13:16:36.952  INFO 27862 --- [enerContainer-2] c.a.s.javamessaging.SQSMessageConsumer   : Shutting down ConsumerPrefetch executor
    2018-11-21 13:16:41.637  INFO 27862 --- [enerContainer-3] com.amazon.sqs.javamessaging.SQSSession  : Shutting down SessionCallBackScheduler executor
    2018-11-21 13:16:41.639  INFO 27862 --- [enerContainer-3] o.s.j.l.DefaultMessageListenerContainer  : Successfully refreshed JMS Connection

Код:

class MessagingListener {

  @JmsListener(destination = '${aws.queue}', containerFactory = 'jmsListenerContainerFactory')
  void processMessage(String record) {
    try {
      log.debug("Message received: " + record)
      ... Business Logic...
    } catch (Exception ex) {
      log.error("Error parsing message", ex)
    }
  }

}

@Configuration
class MessagingConfiguration {
    SQSConnectionFactory connectionFactory

    MessagingConfiguration(@Value('${aws.accessKeyId}') String accessKeyId,
                            @Value('${aws.secretKey}') String secretKey,
                            @Value('${aws.region}') String region {
        connectionFactory = SQSConnectionFactory.builder()
                .withRegion(Region.getRegion(region as Regions))
                .withAWSCredentialsProvider(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, secretKey)))
                .build()
    }

    @Bean
    DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory()
        factory.setConnectionFactory(this.connectionFactory)
        factory.setDestinationResolver(new DynamicDestinationResolver())
        factory.setConcurrency("3-10")
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
        return factory
    }

    @Bean
    MessagingListener MessagingListener() {
        new MessagingListener()
    }
}

Только перезапуск приложения заставляет его работать снова.Кто-нибудь испытывал это?

...