Неожиданное поведение с asyn c и повторная попытка весенней загрузки - PullRequest
1 голос
/ 02 августа 2020

У меня есть вариант использования для параллельного вызова нескольких REST APIS. Но уловка здесь в том, что им нужен токен доступа, который будет передаваться в заголовке каждого запроса. Итак, я создал эту переменную уровня класса. Также существует повторная попытка в случае тайм-аута или истекшего токена. В случае истекшего токена мы вызываем метод loadToken (), чтобы получить новый токен и повторить тот же вызов. у нас есть максимальное количество повторных попыток - 3 раза.

Проблема:

  1. Я вижу одну проблему: loadToken () может быть вызван несколькими потоками одновременно, и может быть состояние гонки здесь.
  2. Я наблюдаю очень странное поведение в prd. Даже для 500 ошибок повторная попытка отбрасывает метод loadToken (), чего не должно происходить. Только loadToken () должен срабатывать в случае просроченного токена.

Здесь есть какие-либо мысли / предложения по улучшению этого кода. Спасибо за любую помощь.

Код выглядит так:

@Component
public class MyDaoImpl implements MyDao {
    
    @Autowired
    private RestTemplate restTemplate;
    
    private String token;

    @PostConstruct
    public void loadToken() throws CustomException {
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
        MultiValueMap<String, String> map= new LinkedMultiValueMap<>();
        HttpEntity<MultiValueMap<String, String>> request = new HttpEntity<>(map, headers);
        try {
            ResponseEntity<TokenResponse> exchange = restTemplate.exchange(tokenUrl, HttpMethod.POST, request, TokenResponse.class);
            token = exchange.getBody()!=null && StringUtils.isNotBlank(exchange.getBody().getToken()) ? exchange.getBody().getToken() : null;
        } catch (Exception e) {
            throw e; 
        }
    }

    @Override
    @Async("threadPoolExecutor")
    @Retryable(maxAttemptsExpression = "#{${retry.max.attempts:3}}", value = {UnAuthorizedException.class, GatewayTimeoutException.class}, backoff = @Backoff(delayExpression = "#{${retry.backoff.period:1000}}"))
    public CompletableFuture<String> getResponse1(String string) throws CustomException {
        HttpHeaders headers = new HttpHeaders();
        headers.setBearerAuth(getAccessToken());
        HttpEntity<String> request = new HttpEntity<>(headers);
        try {
            ResponseEntity<byte[]> exchange = restTemplate.exchange(url, HttpMethod.GET, request, byte[].class);
            if(exchange.getBody()==null) return CompletableFuture.completedFuture(null);
            return CompletableFuture.completedFuture(Base64.getEncoder().encodeToString(exchange.getBody()));
        } catch (HttpClientErrorException | HttpServerErrorException e) {
            if(e.getStatusCode()==HttpStatus.UNAUTHORIZED) {
                loadToken();
                throw new UnAuthorizedException("Expired Token", e);
            }
        } catch (Exception e) {
            if(StringUtils.isNotEmpty(e.getMessage()) && (e.getMessage().contains("I/O error on ") || e.getMessage().contains("timeout"))) {
                throw new GatewayTimeoutException("Gateway_Timeout", e);
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    @Async("threadPoolExecutor")
    @Retryable(maxAttemptsExpression = "#{${retry.max.attempts:3}}", value = {UnAuthorizedException.class, GatewayTimeoutException.class}, backoff = @Backoff(delayExpression = "#{${retry.backoff.period:1000}}"))
    public CompletableFuture<SomeResponse> getResponse2(String string) throws CustomException {
        HttpHeaders headers = new HttpHeaders();
        headers.setBearerAuth(getAccessToken());
        HttpEntity<String> request = new HttpEntity<>(headers);
        try {
            final ResponseEntity<Response> exchange = restTemplate.exchange(someurl, HttpMethod.GET, request, Response.class);
            return CompletableFuture.completedFuture(exchange.getBody());
        } catch (HttpClientErrorException | HttpServerErrorException e) {
            if(e.getStatusCode()==HttpStatus.UNAUTHORIZED) {
                loadToken();
                throw new UnAuthorizedException("Expired Token", e);
            }
        } catch (Exception e) {
            if(StringUtils.isNotEmpty(e.getMessage()) && (e.getMessage().contains("I/O error on ") || e.getMessage().contains("timeout"))) {
                throw new GatewayTimeoutException("Gateway_Timeout", e);
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public String getAccessToken() throws JbException {
        if(StringUtils.isBlank(token)) {
            loadToken();
        }
        return token;
    }

    @Override
    @Async("threadPoolExecutor")
    @Retryable(maxAttemptsExpression = "#{${retry.max.attempts:3}}", value = {UnAuthorizedException.class, GatewayTimeoutException.class}, backoff = @Backoff(delayExpression = "#{${retry.backoff.period:1000}}"))
    public CompletableFuture<List<Response3>> getResponse3(String string) throws CustomException {
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        headers.setBearerAuth(getAccessToken());
        HttpEntity<String> request = new HttpEntity<>("{}", headers);
        try {
            ResponseEntity<List<CobrandCard>> exchange = restTemplate.exchange(someUrl2, HttpMethod.POST, request, new ParameterizedTypeReference<List<Response3>>() {});
            return CompletableFuture.completedFuture(exchange.getBody());
        } catch (HttpClientErrorException | HttpServerErrorException e) {
            if(e.getStatusCode()==HttpStatus.UNAUTHORIZED) {
                loadToken();
                throw new UnAuthorizedException("Expired Token", e);
            }
        } catch (Exception e) {
            if(StringUtils.isNotEmpty(e.getMessage()) && (e.getMessage().contains("I/O error on ") || e.getMessage().contains("timeout"))) {
                throw new GatewayTimeoutException("Gateway_Timeout", e);
            }
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    @Async("threadPoolExecutor")
    @Retryable(maxAttemptsExpression = "#{${retry.max.attempts:3}}", value = {UnAuthorizedException.class, GatewayTimeoutException.class}, backoff = @Backoff(delayExpression = "#{${retry.backoff.period:1000}}"))
    public CompletableFuture<List<Response4>> getTier(String string) throws CustomException {
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        headers.setBearerAuth(getAccessToken());
        HttpEntity<String> request = new HttpEntity<>(headers);
        try {
            ResponseEntity<List<Response4>> exchange = restTemplate.exchange(finalUrl, HttpMethod.GET, request, new ParameterizedTypeReference<List<Response4>>() {});
            return CompletableFuture.completedFuture(exchange.getBody());
        } catch (HttpClientErrorException | HttpServerErrorException e) {
            if(e.getStatusCode()==HttpStatus.UNAUTHORIZED) {
                loadToken();
                throw new UnAuthorizedException("Expired Token", e);
            }
        } catch (Exception e) {
            if(StringUtils.isNotEmpty(e.getMessage()) && (e.getMessage().contains("I/O error on ") || e.getMessage().contains("timeout"))) {
                throw new GatewayTimeoutException("Gateway_Timeout", e);
            }
        }
        return CompletableFuture.completedFuture(null);
    }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...