В нашем приложении мы в настоящее время используем JmsListener + AWS SQS.Некоторое время он работает хорошо, но когда случается некоторая нестабильность AWS (например, Service Unavailable), его потребитель пытается восстановиться, выключившись и перезапустившись снова, но после успешного запуска больше не потребляет сообщение.
Stacktrace:
2018-11-21 13:16:33.941 WARN 27862 --- [enerContainer-3] o.s.j.l.DefaultMessageListenerContainer : Execution of JMS message listener failed, and no ErrorHandler has been set.
javax.jms.JMSException: AmazonServiceException: deleteMessageBatch. RequestId: null
HTTPStatusCode: 503 AmazonErrorCode: 503 Service Unavailable
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.handleException(AmazonSQSMessagingClientWrapper.java:397)
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.deleteMessageBatch(AmazonSQSMessagingClientWrapper.java:151)
at com.amazon.sqs.javamessaging.acknowledge.RangedAcknowledger.action(RangedAcknowledger.java:145)
at com.amazon.sqs.javamessaging.acknowledge.BulkSQSOperation.bulkAction(BulkSQSOperation.java:61)
at com.amazon.sqs.javamessaging.acknowledge.RangedAcknowledger.acknowledge(RangedAcknowledger.java:70)
at com.amazon.sqs.javamessaging.message.SQSMessage.acknowledge(SQSMessage.java:883)
at org.springframework.jms.listener.AbstractMessageListenerContainer.commitIfNecessary(AbstractMessageListenerContainer.java:763)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:663)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1166)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.sqs.model.AmazonSQSException: null (Service: AmazonSQS; Status Code: 503; Error Code: 503 Service Unavailable; Request ID: null)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1588)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1258)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:1557)
at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:1533)
at com.amazonaws.services.sqs.AmazonSQSClient.deleteMessageBatch(AmazonSQSClient.java:777)
at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.deleteMessageBatch(AmazonSQSMessagingClientWrapper.java:149)
... 12 common frames omitted
2018-11-21 13:16:36.371 INFO 27862 --- [enerContainer-3] com.amazon.sqs.javamessaging.SQSSession : Shutting down SessionCallBackScheduler executor
2018-11-21 13:16:36.371 INFO 27862 --- [enerContainer-7] c.a.s.javamessaging.SQSMessageConsumer : Shutting down ConsumerPrefetch executor
2018-11-21 13:16:36.371 WARN 27862 --- [enerContainer-3] o.s.j.l.DefaultMessageListenerContainer : Setup of JMS message listener invoker failed for destination 'QUEUE_NAME' - trying to recover. Cause: AmazonServiceException: deleteMessageBatch. RequestId: null
HTTPStatusCode: 503 AmazonErrorCode: 503 Service Unavailable
...
2018-11-21 13:16:36.951 INFO 27862 --- [enerContainer-7] com.amazon.sqs.javamessaging.SQSSession : Shutting down SessionCallBackScheduler executor
2018-11-21 13:16:36.952 INFO 27862 --- [enerContainer-2] c.a.s.javamessaging.SQSMessageConsumer : Shutting down ConsumerPrefetch executor
2018-11-21 13:16:41.637 INFO 27862 --- [enerContainer-3] com.amazon.sqs.javamessaging.SQSSession : Shutting down SessionCallBackScheduler executor
2018-11-21 13:16:41.639 INFO 27862 --- [enerContainer-3] o.s.j.l.DefaultMessageListenerContainer : Successfully refreshed JMS Connection
Код:
class MessagingListener {
@JmsListener(destination = '${aws.queue}', containerFactory = 'jmsListenerContainerFactory')
void processMessage(String record) {
try {
log.debug("Message received: " + record)
... Business Logic...
} catch (Exception ex) {
log.error("Error parsing message", ex)
}
}
}
@Configuration
class MessagingConfiguration {
SQSConnectionFactory connectionFactory
MessagingConfiguration(@Value('${aws.accessKeyId}') String accessKeyId,
@Value('${aws.secretKey}') String secretKey,
@Value('${aws.region}') String region {
connectionFactory = SQSConnectionFactory.builder()
.withRegion(Region.getRegion(region as Regions))
.withAWSCredentialsProvider(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, secretKey)))
.build()
}
@Bean
DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory()
factory.setConnectionFactory(this.connectionFactory)
factory.setDestinationResolver(new DynamicDestinationResolver())
factory.setConcurrency("3-10")
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE)
return factory
}
@Bean
MessagingListener MessagingListener() {
new MessagingListener()
}
}
Только перезапуск приложения заставляет его работать снова.Кто-нибудь испытывал это?