Кэш не работает должным образом с использованием реактора и кофеина - PullRequest
0 голосов
/ 15 марта 2019

Мне нужно проверять некоторые конечные точки через разные интервалы, поэтому я настроил построитель кэша Caffeine.

this.localeWeatherCache = newBuilder().build();

this.currentWeatherCache=newBuilder().expireAfterWrite(Duration.ofHours(3)).build();

this.weatherForecastsCache = newBuilder().expireAfterWrite(Duration.ofHours(12)).build();

В моем Сервисе я вызываю эти 3 конечные точки, в конце я возвращаю свой объект со всеми деталями, используя Mono.zip ().

Во время моих тестов я заметил, что climaTempoRepository.findLocaleByCityNameAndState выполняется дважды и после истечения срока действия кэша currentWeather делает еще один вызов конечной точки locale , то же самое происходит с weatherForecast, он снова вызывает locale .

Почему это не получается? Разве это не должно использовать кеш? Или я поступил неправильно?

Любая помощь или указатели очень ценятся! :)

public Mono<Weather> weatherForecastByLocation(Location location) {

    Mono<ClimaTempoLocale> locale =
            CacheMono.lookup(key ->
                    Mono.justOrEmpty(localeWeatherCache.getIfPresent(key))
                            .map(Signal::next), location)
                    .onCacheMissResume(() -> climaTempoRepository.findLocaleByCityNameAndState(location.city(), location.state()))
                    .andWriteWith((key, signal) -> Mono.fromRunnable(() ->
                            Optional.ofNullable(signal.get())
                                    .ifPresent(value -> localeWeatherCache.put(key, value))));

    Mono<CurrentWeather> currentWeather =
            CacheMono.lookup(key ->
                    Mono.justOrEmpty(currentWeatherCache.getIfPresent(key))
                            .map(Signal::next), location)
                    .onCacheMissResume(() -> locale.flatMap(climaTempoRepository::findCurrentWeatherByLocale)
                            .subscribeOn(Schedulers.elastic()))
                    .andWriteWith((key, signal) -> Mono.fromRunnable(() ->
                            Optional.ofNullable(signal.get())
                                    .ifPresent(value -> currentWeatherCache.put(key, value))));

    Mono<WeatherForecasts> weatherForecasts =
            CacheMono.lookup(key ->
                    Mono.justOrEmpty(weatherForecastsCache.getIfPresent(key))
                            .map(Signal::next), location)
                    .onCacheMissResume(() -> locale.flatMap(climaTempoRepository::findDailyForecastByLocale)
                            .subscribeOn(Schedulers.elastic()))
                    .andWriteWith((key, signal) -> Mono.fromRunnable(() ->
                            Optional.ofNullable(signal.get())
                                    .ifPresent(value -> weatherForecastsCache.put(key, value))));

    return Mono.zip(currentWeather,
            weatherForecasts,
            (current, forecasts) ->
                    Weather.buildWith(builder -> {
                        builder.location = location;
                        builder.currentWeather = current;
                        builder.weatherForecasts = forecasts;
                    }));

}

Ответы [ 2 ]

1 голос
/ 16 марта 2019

Как показано здесь https://stackoverflow.com/a/52803247/11209784 ClimaTempoLocale может быть вычислено следующим образом:

Cache<Location, ClimaTempoLocale> weatherLocaleCache = Caffeine.newBuilder().build();

private Mono<ClimaTempoLocale> findLocale(Location location) {
    Mono<ClimaTempoLocale> locale;
    ClimaTempoLocale cachedLocale = weatherLocaleCache.getIfPresent(location);
    if (cachedLocale != null) {
        locale = Mono.just(cachedLocale);
    } else {
        locale = climaTempoRepository.findLocaleByCityNameAndState(location.city(), location.state())
                .doOnNext(climaTempoLocale -> weatherLocaleCache.put(location, climaTempoLocale));
    }

    return locale;
}

Одним из побочных эффектов является то, что могут быть последовательные записи в один и тот же ключ, когда одновременные вызовы приводят к отсутствию кэша.

При этом вызовы, зависящие от ClimaTempoLocale, могут продолжаться так же:

Cache<Location, CurrentWeather> currentWeatherCache = Caffeine.newBuilder().expireAfterWrite(Duration.ofHours(3)).build();

Cache<Location, WeatherForecasts> weatherForecastsCache = Caffeine.newBuilder().expireAfterWrite(Duration.ofHours(12)).build();

public Mono<Weather> weatherForecastByLocation(Location location) {
    Mono<ClimaTempoLocale> locale = findLocale(location);

    Mono<CurrentWeather> currentWeather =
            CacheMono.lookup(
                    key -> Mono.justOrEmpty(currentWeatherCache.getIfPresent(key))
                            .map(Signal::next),
                    location)
                    .onCacheMissResume(
                            () -> locale.flatMap(climaTempoRepository::findCurrentWeatherByLocale)
                                    .subscribeOn(Schedulers.elastic()))
                    .andWriteWith(
                            (key, signal) -> Mono.fromRunnable(
                                    () -> Optional.ofNullable(signal.get())
                                            .ifPresent(value -> currentWeatherCache.put(key, value))));

    Mono<WeatherForecasts> weatherForecasts =
            CacheMono.lookup(
                    key -> Mono.justOrEmpty(weatherForecastsCache.getIfPresent(key))
                            .map(Signal::next),
                    location)
                    .onCacheMissResume(
                            () -> locale.flatMap(climaTempoRepository::findDailyForecastByLocale)
                                    .subscribeOn(Schedulers.elastic()))
                    .andWriteWith(
                            (key, signal) -> Mono.fromRunnable(
                                    () -> Optional.ofNullable(signal.get())
                                            .ifPresent(value -> weatherForecastsCache.put(key, value))));

    return Mono.zip(currentWeather,
            weatherForecasts,
            (current, forecasts) ->
                    Weather.buildWith(builder -> {
                        builder.location = location;
                        builder.currentWeather = current;
                        builder.weatherForecasts = forecasts;
                    }));
}
1 голос
/ 16 марта 2019

A AsyncLoadingCache может вычислить значение из ключа и возвращает CompletableFuture результата. Это может быть переведено в Mono это fromFuture метод. Это обеспечит выполнение только одного выполнения для данного ключа без блокирования из-за хранения фьючерсов в кэше.

AsyncLoadingCache<Location, ClimaTempoLocale> localeWeatherCache = 
    Caffeine.newBuilder().buildAsync(location -> 
        climaTempoRepository.findLocaleByCityNameAndState(location.city(), location.state()));

AsyncLoadingCache<ClimaTempoLocale, CurrentWeather> currentWeatherCache =
    Caffeine.newBuilder().buildAsync(climaTempoRepository::findCurrentWeatherByLocale);

AsyncLoadingCache<ClimaTempoLocale, WeatherForecasts> weatherForecastsCache =
    Caffeine.newBuilder().buildAsync(climaTempoRepository::findDailyForecastByLocale);

public Mono<Weather> weatherForecastByLocation(Location location) {
  var locale = Mono.fromFuture(localeWeatherCache.get(location));
  var currentWeather = Mono.fromFuture(locale.map(localeWeatherCache::get));
  var weatherForecasts = Mono.fromFuture(locale.map(weatherForecastsCache::get));

  return Mono.zip(currentWeather, weatherForecasts, (current, forecasts) ->
      Weather.buildWith(builder -> {
          builder.location = location;
          builder.currentWeather = current;
          builder.weatherForecasts = forecasts;
      }));
}
...