У меня есть ситуация, когда некоторые события, которые мне нужно отслеживать, становятся "устаревшими" после определенного c таймаута (expiryTimeout
). По истечении срока действия мне нужно «удалить» ссылки на событие с истекшим сроком действия, а также уведомить внешнюю службу о том, что событие истекло. Если до истечения срока действия получено более свежее событие для того же ключа, то тайм-аут «сбрасывается» (например, событие «обновляется»). В этом случае не нужно отправлять уведомление.
Я пытаюсь использовать кеш Guava для достижения вышеизложенного и не изобретать велосипед, но я должен что-то упустить, потому что, похоже, истечение срока действия быть запущенным.
У меня есть следующий код IO
для создания «кэша»:
abstract sealed case class StaleTasksCleanerWithGuava private(srvc: ExternalService,
expiryTimeout: FiniteDuration)
(implicit cs: ContextShift[IO], evictionEs: ExecutorService) {
implicit val logger = Slf4jLogger.getLogger[IO]
private val removalListener = new RemovalListener[Key, TaskEvent]() {
override def onRemoval(rn: RemovalNotification[Key, TaskEvent]): Unit = {
val f = if(rn.wasEvicted()){
val (appId, taskId) = rn.getKey
logger.debug(s"[$appId-$taskId] Expiring entry now...") *>
srvc.cleanUpStaleTask(appId, taskId) *>
logger.debug(s"[$appId-$taskId] Entry expired.")
} else
IO.unit
f.unsafeRunSync() // Necessary because of the way Guava's API is designed
}
}
private val asyncRemovalListener = RemovalListeners.asynchronous(removalListener, evictionEs)
private val c: Cache[Key, TaskEvent] = CacheBuilder.newBuilder()
.expireAfterWrite(expiryTimeout.length, expiryTimeout.unit)
.removalListener(asyncRemovalListener).build[Key, TaskEvent]()
def putTask(e: TaskEvent): IO[Unit] = {
val k = (e.appId, e.taskId)
logger.debug(s"[${k._1}-${k._2}] Entry will expire in $expiryTimeout if not overridden.") *>
IO(c.put(k, e))
}
}
object StaleTasksCleanerWithGuava {
type Key = (String, String)
def create(srvc: ExternalService, expiryTimeout: FiniteDuration = 1.second)
(implicit cs: ContextShift[IO], evictionEs: ExecutorService): IO[StaleTasksCleanerWithGuava] = {
IO(new StaleTasksCleanerWithGuava(srvc, expiryTimeout) {})
}
}
Наиболее примечательным из этого является то, что я использую val asyncRemovalListener = RemovalListeners.asynchronous(removalListener, evictionEs)
для обеспечения что истечение срока и удаление выполняются асинхронно, а не как часть других операций кэша (чтение / запись).
Я создал следующий пример приложения для простого тестирования:
object TestAppWithGuava2 extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
implicit val logger = Slf4jLogger.getLogger[IO]
val dummy = new ExternalService {
override def cleanUpStaleTask(appId: String, taskId: String): IO[Unit] =
logger.debug(s"[Service - ${appId}-${taskId}] Notified! +1")
}
val taskEvent1 = TaskEvent("A", "task1")
val taskEvent2 = TaskEvent("B", "task2")
val taskEvent3 = TaskEvent("C", "task1")
val evictionEcResource = Resource.make(IO{Executors.newFixedThreadPool(8)})(tp => IO(tp.shutdownNow()).void)
val program = evictionEcResource.use{ evictionEc =>
for {
cache <- StaleTasksCleanerWithGuava.create(dummy, 3.second)(contextShift, evictionEc)
_ <- cache.putTask(taskEvent3)
_ <- cache.putTask(taskEvent1)
_ <- cache.putTask(taskEvent2)
_ <- IO.never /// This is here on purpose, as I want to see things being expired
} yield ()
}
IO.race(program, IO.sleep(6.seconds)).as(ExitCode.Success)
}
}
Однако это, к сожалению, не в состоянии сделать то, что я ожидаю. :) Когда я запускаю это, я получаю эти строки журнала, и через несколько секунд ничего не происходит, приложение завершается (так как IO.sleep(6.seconds)
завершается и «выигрывает» race
).
2020-03-01 18:48:41,862 [StaleTasksCleanerWithGuava] [C-task1] Entry will expire in 3 seconds if not overridden.
2020-03-01 18:48:41,874 [StaleTasksCleanerWithGuava] [A-task1] Entry will expire in 3 seconds if not overridden.
2020-03-01 18:48:41,874 [StaleTasksCleanerWithGuava] [B-task2] Entry will expire in 3 seconds if not overridden.
Чего мне не хватает? Спасибо