Получение не может выдать тик 0 из-за отсутствия запросов (интервал не поддерживает небольшие нисходящие запросы, которые повторяются на sh медленнее, чем тики) - PullRequest
0 голосов
/ 16 апреля 2020

Я использую публичный c API с реактивным WebClient. Я хочу вызывать API и сохранять самые новые данные оттуда через каждые 1 час. Проблема возникает, когда я добавляю метод .repeatWhen() в поток Flux, как показано ниже:

@Service
public class Covid19APIService {

  @Value("${rapid-API-URL}")
  private String covidAPIURL;

  @Value("${x-rapidapi-host}")
  private String covidAPI;

  @Value("${x-rapidapi-key}")
  private String covidAPIKey;

  private WebClient buildWebClient() {
    return WebClient.builder()
        .baseUrl(covidAPIURL)
        .exchangeStrategies(ExchangeStrategies.builder().codecs(this::acceptedCodecs).build())
        .defaultHeaders(
            httpHeaders -> {
              httpHeaders.add("x-rapidapi-host", covidAPI);
              httpHeaders.add("x-rapidapi-key", covidAPIKey);
            })
        .build();
  }

  public Flux<CountryCasesHistoryWrapper> findCasesHistoryForCountryAndDate1(
      String country, String date) {

     return buildWebClient()
        .get()
        .uri(
            builder ->
                builder
                    .path("/history_by_particular_country_by_date.php")
                    .queryParam("country", country)
                    .queryParam("date", date)
                    .build())
        .retrieve()
        .onStatus(
            HttpStatus::is4xxClientError,
            response -> error(new InfrastructureException("Covid19 public API not found")))
        .onStatus(
            HttpStatus::is4xxClientError,
            response -> error(new InfrastructureException("Covid19 public API server exception")))
        .bodyToMono(CountryCasesHistoryWrapper.class)
             .repeatWhen(interval -> Flux.interval(Duration.ofMinutes(10)))
             .single()
          .repeatWhen(interval -> Flux.interval(Duration.ofSeconds(30))).delayElements(Duration.ofSeconds(10));
  }

  private void acceptedCodecs(ClientCodecConfigurer clientCodecConfigurer) {
    clientCodecConfigurer
        .customCodecs()
        .register(new Jackson2JsonEncoder(new ObjectMapper(), TEXT_HTML));
    clientCodecConfigurer
        .customCodecs()
        .register(new Jackson2JsonDecoder(new ObjectMapper(), TEXT_HTML));
  }
}

и метод обработчика:

public Mono<ServerResponse> getCasesHistoryForCountryAndDate1(ServerRequest serverRequest) {
    String country =
        serverRequest
            .queryParam("country")
            .orElseThrow(
                () -> new InfrastructureException("Country parameter is required for this action"));

    String date =
        serverRequest
            .queryParam("date")
            .orElseThrow(
                () -> new InfrastructureException("Date parameter is required for this action"));

    final Flux<CountryCasesHistoryWrapper> casesHistoryForCountryAndDate =
        apiService.findCasesHistoryForCountryAndDate1(country, date);

    return casesHistoryForCountryAndDate
        .flatMap(
            countryCasesHistoryWrapper ->
                repository.saveAll(countryCasesHistoryWrapper.getStatsByCountry()))
        .collectList()
        .flatMap(
            countryCasesHistories ->
                ServerResponse.ok()
                    .contentType(MediaType.APPLICATION_JSON)
                    .body(BodyInserters.fromValue(countryCasesHistories)));
}

Вышеприведенная реализация приводит к исключению:

reactor.core.Exceptions$OverflowException: Could not emit tick 0 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:231) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]

Я заметил похожий вопрос click , но, несмотря на добавление delayElements(), исключение метода все еще происходит. Здесь возникает вопрос, как исправить текущую реализацию и достичь желаемой цели, которая вызывает API каждый час и получать оттуда самые новые данные. Буду благодарен за все предложения.

...