Обработка временных исключений с помощью Spring Data Couchbase - PullRequest
1 голос
/ 01 июля 2019

У меня очень простое приложение Spring Boot, которое получает данные из внешней системы и сохраняет их в Couchbase. Я пытаюсь обработать сценарий, в котором мое приложение испытывает временную проблему с подключением к базе данных.

Я использую Couchbase 6.0 и spring-data-couchbase версию 3.1.2.RELEASE в своем приложении и пытаюсь реализовать RetryTemplate с CustomExceptionClassifierRetryPolicy, как показано ниже:

retryTemplate.setRetryPolicy(new CustomExceptionClassifierRetryPolicy());

Ниже приведена реализация моего CustomExceptionClassifierRetryPolicy

public class CustomExceptionClassifierRetryPolicy extends ExceptionClassifierRetryPolicy {
private static final long serialVersionUID = 1L;

public CustomExceptionClassifierRetryPolicy() {
    final AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();
    this.setExceptionClassifier(new Classifier<Throwable, RetryPolicy>() {
        @Override
        public RetryPolicy classify(Throwable classifiable) {
            if (classifiable.getCause() instanceof TransientDataAccessException) {
                return alwaysRetryPolicy;
            }
            return new NeverRetryPolicy();
        }
    });
}

}

В приведенной выше реализации я устанавливаю AlwaysRetryPolicy для обработки всех проблем, связанных с переходной сетью, которые могут быть решены через некоторое время.

Проблема в том, что когда я выключаю Couchbase и пытаюсь вставить документы (просто для имитации проблемы с сетью), я получаю исключение CouchbaseDataIntegrityViolationException (см. Ниже трассировку стека), которое я вижу из документов подкласс NonTransientDataAccessException. Это полностью отрицает цель определения CustomExceptionClassifierRetryPolicy.

org.springframework.data.couchbase.core.CouchbaseDataIntegrityViolationException: Insert document failed: java.util.concurrent.TimeoutException: {"b":"LWS_ORDER_AUDIT","s":"kv","t":2500000,"i":"0x88"}; nested exception is java.lang.RuntimeException: java.util.concurrent.TimeoutException: {"b":"LWS_ORDER_AUDIT","s":"kv","t":2500000,"i":"0x88"}
    at org.springframework.data.couchbase.core.CouchbaseTemplate.handleWriteResultError(CouchbaseTemplate.java:208)
    at org.springframework.data.couchbase.core.CouchbaseTemplate.access$500(CouchbaseTemplate.java:97)
    at org.springframework.data.couchbase.core.CouchbaseTemplate$11.doInBucket(CouchbaseTemplate.java:658)
    at org.springframework.data.couchbase.core.CouchbaseTemplate$11.doInBucket(CouchbaseTemplate.java:603)
    at org.springframework.data.couchbase.core.CouchbaseTemplate.execute(CouchbaseTemplate.java:553)
    at org.springframework.data.couchbase.core.CouchbaseTemplate.doPersist(CouchbaseTemplate.java:603)
    at org.springframework.data.couchbase.core.CouchbaseTemplate.insert(CouchbaseTemplate.java:283)
    at org.springframework.data.couchbase.core.CouchbaseTemplate.insert(CouchbaseTemplate.java:277)
    at com.lws.orderauditservice.service.CouchbaseService.saveAudits(CouchbaseService.java:54)
    at com.lws.orderauditservice.service.KafkaOrderAuditConsumerService.receiveAuditData(KafkaOrderAuditConsumerService.java:28)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:80)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1141)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1115)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1057)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:903)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:753)
Caused by: java.lang.RuntimeException: java.util.concurrent.TimeoutException: {"b":"LWS_ORDER_AUDIT","s":"kv","t":2500000,"i":"0x88"}
    at rx.exceptions.Exceptions.propagate(Exceptions.java:57)
    at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:463)
    at rx.observables.BlockingObservable.single(BlockingObservable.java:340)
    at com.couchbase.client.java.CouchbaseBucket.insert(CouchbaseBucket.java:320)
    at com.couchbase.client.java.CouchbaseBucket.insert(CouchbaseBucket.java:315)
    at org.springframework.data.couchbase.core.CouchbaseTemplate$11.doInBucket(CouchbaseTemplate.java:634)
    ... 30 more
Caused by: java.util.concurrent.TimeoutException: {"b":"LWS_ORDER_AUDIT","s":"kv","t":2500000,"i":"0x88"}
    at com.couchbase.client.java.bucket.api.Utils$1.call(Utils.java:131)
    at com.couchbase.client.java.bucket.api.Utils$1.call(Utils.java:127)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140)
    at rx.internal.operators.OnSubscribeTimeoutTimedWithFallback$TimeoutMainSubscriber.onTimeout(OnSubscribeTimeoutTimedWithFallback.java:166)
    at rx.internal.operators.OnSubscribeTimeoutTimedWithFallback$TimeoutMainSubscriber$TimeoutTask.call(OnSubscribeTimeoutTimedWithFallback.java:191)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

Я думал, что определение политики повтора для org.springframework.dao.TransientDataAccessException исключение решит мою проблему, но, похоже, это поможет.

Может кто-нибудь подсказать мне , как я могу решить все временные проблемы, связанные с сетью, в моем приложении и перейти к механизму повторных попыток ?

...