Потокобезопасный реактивный кеш - PullRequest
0 голосов
/ 11 июня 2019

Я всегда зацикливаюсь на реактивном мышлении, когда мне приходится начинать делать несколько внешних вызовов. Сейчас мне поручено реализовать следующий сценарий с использованием Reactor и Webflux.

  1. Получить токен из кеша
    1. Токен не найден
      1. Запрос от системы авторизации нового токена
      2. Сохранять токен в кеше
    2. Жетон найден
      1. Использовать токен для запроса к внешней системе
        1. Внешняя система отвечает 2xx
          1. Обратный ответ
        2. Внешняя система отвечает 403
          1. Неверный токен в кеше
          2. Повторить весь поток сверху

Пока у меня есть только пример того, как я хотел бы использовать кеш:

  public static Mono<ServerResponse> doRequest(
      Function<Mono<Token>, Mono<ClientResponse>> clientRequest,
      Function<Mono<ClientResponse>, Mono<ServerResponse>> errorHandler
  ) {
    Token token = null;
    return token.getToken()
        .transform(clientRequest)
        .flatMap(clientResponse -> {
              if (clientResponse.statusCode().value() == 403) {
                // HOW WOULD I INVALIDATE ANYTHING HERE? Or where should i do it?
                return Mono.error(new TokenExpired("Token expired"));
              }
              return Mono.just(clientResponse);
        })
        .transform(errorHandler)
        .retryWhen(companion -> {
      return companion.zipWith(Flux.range(1,4),
          (error, index) -> {
            if (index < 4 && (error instanceof TokenExpired)) {
              return index;
            } else {
              throw Exceptions.propagate(error);
            }
          });
    });
  }

Я посмотрел на addons кеш, но не смог понять, как вручную сделать кеш недействительным, поскольку он был основан только на времени? Кроме того, при использовании кэша кофеина поведение аннулирования не определено в многопоточной среде. Я чувствую, что мой вариант использования является стандартным, но я не смог найти какие-либо шаблоны для этого.

Две проблемы, с которыми я застрял: Убедиться, что обновление кэша происходит только один раз и не является блокирующим. Как сделать запись в кэше недействительной неблокирующим способом.

Действительно, я застрял в том, как подходить к проблеме реактивно, принятый ответ не обязательно должен быть Reactor, это может быть любая реактивная библиотека, показывающая мышление, необходимое для решения проблемы.

1 Ответ

1 голос
/ 11 июня 2019

Кофеин откажется от записи, если вычисление не выполнено или будет иметь нулевое значение, и распространит ее вызывающей стороне.Для AsyncCache он немедленно сохраняет будущее, а обратный вызов whenComplete при необходимости обрабатывает ошибки.Поскольку Caffeine работает с CompleteableFuture, вы можете использовать моно / будущие конвертеры Reactor.

AsyncCache<Token, ServerResponse> responseCache = Caffeine.newBuilder().buildAsync();

public static Mono<ServerResponse> doRequest(
    Function<Mono<Token>, Mono<ClientResponse>> clientRequest,
    Function<Mono<ClientResponse>, Mono<ServerResponse>> errorHandler) {
  Token token = // ...
  Mono<ServerResponse> result = Mono.fromFuture(() -> token.getToken()
      .transform(t -> cache.get(t, (token, executor) -> {
        Mono<ServerResponse> response = clientRequest.apply(token);
        return (response.statusCode().value() == HttpServletResponse.SC_FORBIDDEN)
            ? Mono.error(new TokenExpired("Token expired")).toFuture()
            : Mono.just(clientResponse).toFuture();
      })));
  return result.transform(errorHandler).retryWhen(...);
}

cache.get(key, func) вставит будущее только в том случае, если сопоставление отсутствует (просроченная или собранная запись ожидает очисткирассматривается как отсутствие).Это блокирует только на время будущего возвращения в кэш, который должен быть дешевым, если вся работа будет завершена в будущем.Любые другие запросы будут ожидать установления сопоставления ключ-> будущее и возвращаться в том же будущем, пока это сопоставление не будет удалено (не выполнено или исключено).

Возможно, можно написать универсальный адаптер AsyncCache Реактору на основе этой идиомы.

...