Я использую публичный c API с реактивным WebClient. Я хочу вызывать API и сохранять самые новые данные оттуда через каждые 1 час. Проблема возникает, когда я добавляю метод .repeatWhen()
в поток Flux
, как показано ниже:
@Service
public class Covid19APIService {
@Value("${rapid-API-URL}")
private String covidAPIURL;
@Value("${x-rapidapi-host}")
private String covidAPI;
@Value("${x-rapidapi-key}")
private String covidAPIKey;
private WebClient buildWebClient() {
return WebClient.builder()
.baseUrl(covidAPIURL)
.exchangeStrategies(ExchangeStrategies.builder().codecs(this::acceptedCodecs).build())
.defaultHeaders(
httpHeaders -> {
httpHeaders.add("x-rapidapi-host", covidAPI);
httpHeaders.add("x-rapidapi-key", covidAPIKey);
})
.build();
}
public Flux<CountryCasesHistoryWrapper> findCasesHistoryForCountryAndDate1(
String country, String date) {
return buildWebClient()
.get()
.uri(
builder ->
builder
.path("/history_by_particular_country_by_date.php")
.queryParam("country", country)
.queryParam("date", date)
.build())
.retrieve()
.onStatus(
HttpStatus::is4xxClientError,
response -> error(new InfrastructureException("Covid19 public API not found")))
.onStatus(
HttpStatus::is4xxClientError,
response -> error(new InfrastructureException("Covid19 public API server exception")))
.bodyToMono(CountryCasesHistoryWrapper.class)
.repeatWhen(interval -> Flux.interval(Duration.ofMinutes(10)))
.single()
.repeatWhen(interval -> Flux.interval(Duration.ofSeconds(30))).delayElements(Duration.ofSeconds(10));
}
private void acceptedCodecs(ClientCodecConfigurer clientCodecConfigurer) {
clientCodecConfigurer
.customCodecs()
.register(new Jackson2JsonEncoder(new ObjectMapper(), TEXT_HTML));
clientCodecConfigurer
.customCodecs()
.register(new Jackson2JsonDecoder(new ObjectMapper(), TEXT_HTML));
}
}
и метод обработчика:
public Mono<ServerResponse> getCasesHistoryForCountryAndDate1(ServerRequest serverRequest) {
String country =
serverRequest
.queryParam("country")
.orElseThrow(
() -> new InfrastructureException("Country parameter is required for this action"));
String date =
serverRequest
.queryParam("date")
.orElseThrow(
() -> new InfrastructureException("Date parameter is required for this action"));
final Flux<CountryCasesHistoryWrapper> casesHistoryForCountryAndDate =
apiService.findCasesHistoryForCountryAndDate1(country, date);
return casesHistoryForCountryAndDate
.flatMap(
countryCasesHistoryWrapper ->
repository.saveAll(countryCasesHistoryWrapper.getStatsByCountry()))
.collectList()
.flatMap(
countryCasesHistories ->
ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(countryCasesHistories)));
}
Вышеприведенная реализация приводит к исключению:
reactor.core.Exceptions$OverflowException: Could not emit tick 0 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:231) ~[reactor-core-3.3.3.RELEASE.jar:3.3.3.RELEASE]
Я заметил похожий вопрос click , но, несмотря на добавление delayElements()
, исключение метода все еще происходит. Здесь возникает вопрос, как исправить текущую реализацию и достичь желаемой цели, которая вызывает API каждый час и получать оттуда самые новые данные. Буду благодарен за все предложения.