Удаление кэша гуавы не запускается - PullRequest
0 голосов
/ 01 марта 2020

У меня есть ситуация, когда некоторые события, которые мне нужно отслеживать, становятся "устаревшими" после определенного c таймаута (expiryTimeout). По истечении срока действия мне нужно «удалить» ссылки на событие с истекшим сроком действия, а также уведомить внешнюю службу о том, что событие истекло. Если до истечения срока действия получено более свежее событие для того же ключа, то тайм-аут «сбрасывается» (например, событие «обновляется»). В этом случае не нужно отправлять уведомление.

Я пытаюсь использовать кеш Guava для достижения вышеизложенного и не изобретать велосипед, но я должен что-то упустить, потому что, похоже, истечение срока действия быть запущенным.

У меня есть следующий код IO для создания «кэша»:

abstract sealed case class StaleTasksCleanerWithGuava private(srvc: ExternalService,
                                                              expiryTimeout: FiniteDuration)
                                                             (implicit cs: ContextShift[IO], evictionEs: ExecutorService) {
  implicit val logger = Slf4jLogger.getLogger[IO]

  private val removalListener = new RemovalListener[Key, TaskEvent]() {
    override def onRemoval(rn: RemovalNotification[Key, TaskEvent]): Unit = {
      val f = if(rn.wasEvicted()){
        val (appId, taskId) = rn.getKey
        logger.debug(s"[$appId-$taskId] Expiring entry now...") *>
          srvc.cleanUpStaleTask(appId, taskId) *>
          logger.debug(s"[$appId-$taskId] Entry expired.")
      } else
        IO.unit

      f.unsafeRunSync() // Necessary because of the way Guava's API is designed
    }
  }

  private val asyncRemovalListener = RemovalListeners.asynchronous(removalListener, evictionEs)

  private val c: Cache[Key, TaskEvent] = CacheBuilder.newBuilder()
    .expireAfterWrite(expiryTimeout.length, expiryTimeout.unit)
    .removalListener(asyncRemovalListener).build[Key, TaskEvent]()

  def putTask(e: TaskEvent): IO[Unit] = {
    val k = (e.appId, e.taskId)
    logger.debug(s"[${k._1}-${k._2}] Entry will expire in $expiryTimeout if not overridden.") *>
      IO(c.put(k, e))
  }
}

object StaleTasksCleanerWithGuava {
  type Key = (String, String)

  def create(srvc: ExternalService, expiryTimeout: FiniteDuration = 1.second)
            (implicit cs: ContextShift[IO], evictionEs: ExecutorService): IO[StaleTasksCleanerWithGuava] = {
    IO(new StaleTasksCleanerWithGuava(srvc, expiryTimeout) {})
  }
}

Наиболее примечательным из этого является то, что я использую val asyncRemovalListener = RemovalListeners.asynchronous(removalListener, evictionEs) для обеспечения что истечение срока и удаление выполняются асинхронно, а не как часть других операций кэша (чтение / запись).

Я создал следующий пример приложения для простого тестирования:

object TestAppWithGuava2 extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    implicit val logger = Slf4jLogger.getLogger[IO]

    val dummy = new ExternalService {
      override def cleanUpStaleTask(appId: String, taskId: String): IO[Unit] =
        logger.debug(s"[Service - ${appId}-${taskId}] Notified! +1")
    }
    val taskEvent1 = TaskEvent("A", "task1")
    val taskEvent2 = TaskEvent("B", "task2")
    val taskEvent3 = TaskEvent("C", "task1")

    val evictionEcResource = Resource.make(IO{Executors.newFixedThreadPool(8)})(tp => IO(tp.shutdownNow()).void)

    val program = evictionEcResource.use{ evictionEc =>
      for {
        cache <- StaleTasksCleanerWithGuava.create(dummy, 3.second)(contextShift, evictionEc)
        _ <- cache.putTask(taskEvent3)
        _ <- cache.putTask(taskEvent1)
        _ <- cache.putTask(taskEvent2)
        _ <- IO.never  /// This is here on purpose, as I want to see things being expired
      } yield ()
    }
    IO.race(program, IO.sleep(6.seconds)).as(ExitCode.Success)
  }
}

Однако это, к сожалению, не в состоянии сделать то, что я ожидаю. :) Когда я запускаю это, я получаю эти строки журнала, и через несколько секунд ничего не происходит, приложение завершается (так как IO.sleep(6.seconds) завершается и «выигрывает» race).

2020-03-01 18:48:41,862 [StaleTasksCleanerWithGuava] [C-task1] Entry will expire in 3 seconds if not overridden.
2020-03-01 18:48:41,874 [StaleTasksCleanerWithGuava] [A-task1] Entry will expire in 3 seconds if not overridden.
2020-03-01 18:48:41,874 [StaleTasksCleanerWithGuava] [B-task2] Entry will expire in 3 seconds if not overridden.

Чего мне не хватает? Спасибо

...