Сервис Kotlin с очередью запросов - PullRequest
0 голосов
/ 28 апреля 2019

Я хотел бы разработать сервис со следующим API:

suspend fun getUsers(request: Request): List<User>

Под капотом я бы отправил запрос на сервер (неважно, как, нодопустим, это реактивный WebClient), но вот хитрость: я могу отправлять запросы только каждые 500 мс, в противном случае я получу сообщение об ошибке.

Может ли кто-нибудь порекомендовать мне, как я мог бы реализовать это, например,Таким образом, когда я вызываю getUsers из сопрограммы, она приостанавливается, единица работы добавляется в некоторую очередь службы, которая имеет этот метод, затем реализуется в определенный момент времени и возвращает результат?

Я предполагаю, что могу использовать некоторые ReceiveChannel в качестве очереди, иметь цикл for для своих элементов с delay внутри, но я немного растерялся, где разместить эту логику.Должно ли это быть как фоновый метод, который будет работать вечно и вызываться getUsers?Вероятно, метод close никогда не будет вызван, поэтому этот метод также можно приостановить, но как мне передать значение из этого метода бесконечного выполнения в getUsers, для которого нужны результаты?

РЕДАКТИРОВАТЬ

В данный момент я думаю о решении, подобном этому:

private const val REQUEST_INTERVAL = 500

@Service
class DelayedRequestSenderImpl<T> : DelayedRequestSender<T> {
    private var lastRequestTime: LocalDateTime = LocalDateTime.now()
    private val requestChannel: Channel<Deferred<T>> = Channel()

    override suspend fun requestAsync(block: () -> T): Deferred<T> {
        val deferred = GlobalScope.async(start = CoroutineStart.LAZY) { block() }
        requestChannel.send(deferred)
        return deferred
    }

    @PostConstruct
    private fun startRequestProcessing() = GlobalScope.launch {
        for (request in requestChannel) {
            val now = LocalDateTime.now()
            val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
            if (diff < REQUEST_INTERVAL) {
                delay(REQUEST_INTERVAL - diff)
                lastRequestTime = now
            }
            request.start()
        }
    }
}

Проблема, которую я вижу здесь, заключается в том, что мне нужно генерировать класс, чтобы сделать requestChannel универсальный, так как результатом запроса может быть что угодно.Но это означает, что каждый экземпляр DelayedRequestSender будет привязан к определенному типу.Любой совет, как этого избежать?

РЕДАКТИРОВАТЬ 2

Вот улучшенная версия.Единственный возможный поток, который я вижу в данный момент, заключается в том, что мы должны сделать метод @PostConstruct публичным, чтобы писать любые тесты, если мы хотим или используем отражение.

Идея заключалась в том, чтобы не использовать GlobalScope итакже есть отдельный Job для метода обработки.Это хороший подход?

interface DelayingSupplier {
    suspend fun <T> supply(block: () -> T): T
}

@Service
class DelayingSupplierImpl(@Value("\${vk.request.interval}") private val interval: Int) : DelayingSupplier {
    private var lastRequestTime: LocalDateTime = LocalDateTime.now()
    private val requestChannel: Channel<Deferred<*>> = Channel()
    private val coroutineScope = CoroutineScope(EmptyCoroutineContext)

    override suspend fun <T> supply(block: () -> T): T {
        val deferred = coroutineScope.async(start = CoroutineStart.LAZY) { block() }
        requestChannel.send(deferred)
        return deferred.await()
    }

    @PostConstruct
    fun startProcessing() = coroutineScope.launch(context = Job(coroutineScope.coroutineContext[Job])) {
        for (request in requestChannel) {
            val now = LocalDateTime.now()
            val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
            if (diff < interval) {
                delay(interval - diff)
            }
            lastRequestTime = LocalDateTime.now()
            request.start()
        }
    }
}

Ответы [ 2 ]

1 голос
/ 29 апреля 2019

Я бы порекомендовал:

  • толкает ваши дженерики до уровня функции
  • использование актера вместо реализации сопрограммы (но, возможно, вы предпочитаете это).

В любом случае, это решение должно позволить вам использовать один экземпляр вашей очереди для обработки задержки всех запросов независимо от типа возврата. (Извините, я переименовал некоторые вещи, чтобы помочь моей собственной концептуализации, надеюсь, это все еще имеет смысл):

private const val REQUEST_INTERVAL = 500

interface DelayedRequestHandler {

    suspend fun <T> handleWithDelay(block: () -> T): T

}

class DelayedRequestHandlerImpl(requestInterval: Int = REQUEST_INTERVAL) : DelayedRequestHandler, CoroutineScope {
    private val job = Job()
    override val coroutineContext = Dispatchers.Unconfined + job
    private val delayedHandlerActor = delayedRequestHandlerActor(requestInterval)

    override suspend fun <T> handleWithDelay(block: () -> T): T {
        val result = CompletableDeferred<T>()
        delayedHandlerActor.send(DelayedHandlerMsg(result, block))
        return result.await()
    }
}

private data class DelayedHandlerMsg<RESULT>(val result: CompletableDeferred<RESULT>, val block: () -> RESULT)

private fun CoroutineScope.delayedRequestHandlerActor(requestInterval: Int) = actor<DelayedHandlerMsg<*>>() {
    var lastRequestTime: LocalDateTime = LocalDateTime.now()
    for (message in channel) {
        try {
            println("got a message processing")
            val now = LocalDateTime.now()
            val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
            if (diff < requestInterval) {
                delay(requestInterval - diff)
            }
            lastRequestTime = LocalDateTime.now()
            @Suppress("UNCHECKED_CAST")
            val msgCast = message as DelayedHandlerMsg<Any?>
            val result = msgCast.block()
            println(result)
            msgCast.result.complete(result)
        } catch (e: Exception) {
            message.result.completeExceptionally(e)
        }
    }
}


fun main() = runBlocking {
    val mydelayHandler = DelayedRequestHandlerImpl(2000)
    val jobs = List(10) {
        launch {
            mydelayHandler.handleWithDelay {
                "Result $it"
            }
        }
    }
    jobs.forEach { it.join() }
}
0 голосов
/ 03 мая 2019

Так что это последняя реализация, которую я придумалОбратите внимание на SupevisorJob, так как мы не хотим, чтобы обработка прекращалась в случае сбоя одного из запросов, что вполне возможно и нормально (по крайней мере, в моем случае).

Кроме того, параметр, предложенный @Laurence, можетбыть лучше, но я решил пока не использовать актеров из-за того, что API помечен как устаревший.

@Service
class DelayingRequestSenderImpl(@Value("\${vk.request.interval}") private val interval: Int) : DelayingRequestSender {
    private var lastRequestTime: LocalDateTime = LocalDateTime.now()
    private val requestChannel: Channel<Deferred<*>> = Channel()
    //SupervisorJob is used because we want to have continuous processing of requestChannel
    //even if one of the requests fails
    private val coroutineScope = CoroutineScope(SupervisorJob())

    override suspend fun <T> request(block: () -> T): T {
        val deferred = coroutineScope.async(start = CoroutineStart.LAZY) { block() }
        requestChannel.send(deferred)
        return deferred.await()
    }

    @PostConstruct
    fun startProcessing() = coroutineScope.launch {
        for (request in requestChannel) {
            val now = LocalDateTime.now()
            val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
            if (diff < interval) {
                delay(interval - diff)
            }
            lastRequestTime = LocalDateTime.now()
            request.start()
        }
    }
}
...