Запуск фоновой задачи для обновления кэша - PullRequest
0 голосов
/ 04 ноября 2019

Я создаю веб-сервер, используя Akka-HTTP. При получении запроса (по определенному маршруту) сервер вызывает REST API. Призыв к этому API довольно длинный. В настоящее время я кеширую результат, чтобы следующий запрос использовал кеш. Я хочу иметь фоновую задачу, которая периодически обновляет кэш (вызывая API). При получении запроса сервер будет использовать кэшированный результат (вместо того, чтобы вызывать API). Кеш будет обновляться только через фоновое задание.

Как мне это сделать? Я могу использовать модуль планирования Akka для периодического запуска задачи, но я не знаю, как обновить кэш после запуска задачи.

В настоящее время у меня есть что-то вроде этого:


val roster = Util.get_roster()
var pcache = new SRCache(roster)

val route = cache(lfuCache, keyerFunction)(
  pathSingleSlash {
    get {
      complete(
        HttpEntity(
          ContentTypes.`text/html(UTF-8)`,Views.index(pcache.get).toString))
    }
  }
)

pcache.get вызывает API (что довольно медленно), и я хочу заменить вызов API чем-то, что просто возвращает содержимое кэша и добавляет фоновую задачу для обновления кэша.

1 Ответ

2 голосов
/ 05 ноября 2019

Предполагается, что вы используете кэш из этого примера: https://doc.akka.io/docs/akka-http/current/common/caching.html.

import akka.http.caching.scaladsl.Cache
import akka.http.caching.scaladsl.CachingSettings
import akka.http.caching.LfuCache
import akka.http.scaladsl.server.RequestContext
import akka.http.scaladsl.server.RouteResult
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.server.directives.CachingDirectives._
import scala.concurrent.duration._

// Use the request's URI as the cache's key
val keyerFunction: PartialFunction[RequestContext, Uri] = {
  case r: RequestContext => r.request.uri
}
val defaultCachingSettings = CachingSettings(system)
val lfuCacheSettings =
  defaultCachingSettings.lfuCacheSettings
    .withInitialCapacity(25)
    .withMaxCapacity(50)
    .withTimeToLive(20.seconds)
    .withTimeToIdle(10.seconds)
val cachingSettings =
  defaultCachingSettings.withLfuCacheSettings(lfuCacheSettings)
val lfuCache: Cache[Uri, RouteResult] = LfuCache(cachingSettings)

// Create the route
val route = cache(lfuCache, keyerFunction)(innerRoute)

Ваше фоновое задание должно быть запланировано для обновления lfuCache. Вот интерфейс этого класса кэша, который вы можете использовать: https://doc.akka.io/api/akka-http/10.1.10/akka/http/caching/scaladsl/Cache.html.

Интересующие методы:

 abstract def get(key: K): Option[Future[V]]
 // Retrieves the future instance that is currently in the cache for the given key.

 abstract def getOrLoad(key: K, loadValue: (K) ⇒ Future[V]): Future[V]
 // Returns either the cached Future for the given key, 
 // or applies the given value loading function on the key, producing a Future[V].

 abstract def put(key: K, mayBeValue: Future[V])
                 (implicit ex: ExecutionContext): Future[V]
 // Cache the given future if not cached previously.

Это интерфейс планировщика, который вы можете использовать:

https://doc.akka.io/docs/akka/current/scheduler.html

val cancellable =
  system.scheduler.schedule(0 milliseconds, 5 seconds, ...)

Ваш планировщик будет вызывать lfuCache.put(...) каждые n секунд и обновлять кэш.

Затем ваш код может следовать одному из следующих шаблонов:

  1. Используйте кэшированный маршрут, как вы уже делаете с: val route = cache(lfuCache, keyerFunction)(....
  2. Или просто позвоните lfuCache.get(key) или lfuCache.getOrLoad(...) без использования директив кэширования dsl (без этого: cache(lfuCache,...).

Если вы используете класс Cache непосредственно для ввода и извлечения значений, рассмотрите возможность использования более простых ключей вместо URI значений.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...