Как правильно запустить сопрограммы Kotlin для реализации Caynine AsyncLoadingCache? - PullRequest
1 голос
/ 21 марта 2019

У меня есть серверное приложение Kotlin JVM, использующее сопрограммы, и мне нужно поместить кеш перед неблокирующим сетевым вызовом.Я полагаю, что могу использовать кофеин AsyncLoadingCache, чтобы получить неблокирующее поведение кэша, которое мне нужно.Интерфейс AsyncCacheLoader, который мне нужно реализовать, использует CompletableFuture.Между тем, метод, который я хочу вызвать для загрузки записей кэша, является функцией suspend.

Я могу сократить разрыв следующим образом:

abstract class SuspendingCacheLoader<K, V>: AsyncCacheLoader<K, V> {
    abstract suspend fun load(key: K): V

    final override fun asyncLoad(key: K, executor: Executor): CompletableFuture<V> {
        return GlobalScope.async(executor.asCoroutineDispatcher()) {
            load(key)
        }.asCompletableFuture()
    }
}

Это запустит loadфункция на предоставленном Executor (по умолчанию ForkJoinPool), что с точки зрения кофеина является правильным поведением.

Однако я знаю, что должен попытаться избегать использованияGlobalScope для запуска сопрограмм .

Я подумал о том, чтобы SuspendingCacheLoader реализовал CoroutineScope и управлял собственным контекстом сопрограмм.Но CoroutineScope предназначен для реализации объектами с управляемым жизненным циклом.Ни у кеша, ни у AsyncCacheLoader нет никаких хуков жизненного цикла.Кеш принадлежит экземплярам Executor и CompletableFuture, поэтому он уже таким образом контролирует жизненный цикл задач загрузки.Я не вижу, чтобы наличие задач, принадлежащих контексту сопрограммы, могло бы что-то добавить, и я беспокоюсь, что не смогу правильно закрыть контекст сопрограммы после того, как кэш перестал использоваться.

Написание собственного механизма асинхронного кэширования было бы непосильно трудным, поэтому я бы хотел интегрироваться с реализацией Caffeine, если смогу.

Использует ли GlobalScope правильный подход для реализации AsyncCacheLoader, или естьлучшее решение?

Ответы [ 3 ]

3 голосов
/ 21 марта 2019

Кеш принадлежит экземплярам Executor и CompletableFuture, поэтому он уже контролирует жизненный цикл задач загрузки таким образом.

Это не так, в документации по Caffeine указаночто он использует предоставленные пользователем Executor или ForkJoinPool.commonPool(), если ничего не указано.Это означает, что жизненный цикл по умолчанию отсутствует.

Независимо от прямого вызова GlobalScope кажется неправильным решением, поскольку нет причин жестко кодировать выбор.Просто укажите CoroutineScope через конструктор и используйте GlobalScope в качестве аргумента, пока у вас нет явного жизненного цикла для привязки кеша.

0 голосов
/ 06 июня 2019

Вот мое решение:

Определить функцию расширения CoroutineVerticle

fun <K, V> CoroutineVerticle.buildCache(configurator: Caffeine<Any, Any>.() -> Unit = {}, loader: suspend CoroutineScope.(K) -> V) = Caffeine.newBuilder().apply(configurator).buildAsync { key: K, _ ->
    // do not use cache's executor
    future {
        loader(key)
    }
}

Создайте наш кеш в CoroutineVerticle

val cache : AsyncLoadingCache<String, String> = buildCache({
  maximumSize(10_000)
  expireAfterWrite(10, TimeUnit.MINUTES)
}) { key ->
    // load data and return it
    delay(1000)
    "data for key: $key"
}

Использовать кеш

suspend fun doSomething() {
    val data = cache.get('key').await()

    val future = cache.get('key2')
    val data2 = future.await()
}
0 голосов
/ 31 марта 2019

После некоторых размышлений я придумал другое решение, которое унаследует любой контекст, используемый для вызова метода get, вместо того, чтобы полагаться на фиксированный контекст, созданный с нуля или переданный загрузчику кэша.

Подход работает с использованием AsyncCache.get вместо AsyncCacheLoader.Чтобы получить текущий контекст, я использую глобальное свойство coroutineContext.

suspend fun <K: Any, V> AsyncCache.getSuspending(key: K): V {
    val outerContext = coroutineContext
    return get(key) { k, executor ->
        val innerContext = outerContext + Job() + executor.asCoroutineDispatcher()
        CoroutineScope(innerContext).async {
            loadValue(k) // loadValue is a suspend function defined elsewhere
        }.asCompletableFuture()
    }.await()
}

Контекст, используемый для вызова loadValue, состоит из трех вещей:

  • Исходный контекст, который использовался для вызова getSuspending
  • Новый Job(), который переопределяет задание во внешнем контексте.Это необходимо для того, чтобы ошибки, возникающие в loadValue, не распространялись сразу, а вместо этого регистрировались в CompletableFuture, чтобы кэш мог реагировать на них соответствующим образом.
  • executor, чтобыконфигурация кэша с учетом потоков все еще соблюдается.

Это означает, что контекст чувствителен к тому, откуда был вызван get, что может быть неправильным поведением, если кэшразделены между различными частями приложения.Тем не менее, я думаю, что это поведение лучше, чем мой оригинальный подход.

Комментарии и мысли по этому поводу очень ценятся!

...