Реализация поведения 202 ACCEPTED - Retry-After с помощью RSocket и Project Reactor - PullRequest
0 голосов
/ 29 мая 2020

Я реализую типичный вариант использования, в котором клиент запрашивает ресурс, который будет генерироваться асинхронно. Таким образом, идентификатор ресурса генерируется и сразу же возвращается:

1. CLIENT ---(POST /request-resource)--->  SERVER
2. SERVER (Generates resID, launches async process) ---(202 Accepted - resID)---> CLIENT

В этот момент в СЕРВЕРЕ существует фоновая задача, которая в конечном итоге выдаст результат и сохранит его в базе данных, связанной с resID. КЛИЕНТ будет периодически запрашивать ресурс, повторяя попытки до тех пор, пока он не станет доступным:

3. CLIENT ---(/resource/resID)--->  SERVER (checks in Postgres using reactive driver)
4. SERVER ---(404 - Retry-After 5)---> CLIENT
5. CLIENT ---(/resource/resID)--->  SERVER (checks in Postgres using reactive driver)
6. SERVER ---(200 - JSON Payload)---> CLIENT

Я, хотя RSocket идеально подходит, чтобы избежать этой бесконечной попытки КЛИЕНТА, пока ресурс не станет доступным (шаги 3. на).

Какая модель взаимодействия больше подходит для этой проблемы и как ее реализовать?

Рассмотрим репозиторий следующим образом: ResourceRepository.Mono<Result> getResult(String resID)

Если бы я выбрал модель взаимодействия запрос / ответ , я бы оказался в том же случае, что и раньше. Если только не было способа заставить Mono повторять попытки, пока не будет результата. Возможно ли это?

С запросом / потоком Я мог бы возвращать такие результаты, как Flux<Response> с response.status = PROCESSING, пока запрос к Postgre не вернет результат, тогда Flux будет иметь элемент с response.status = OK, и Flux завершится. Максимальное время потребуется для завершения sh потока без результата в течение заданного периода. В таком случае, как я мог это сделать?

Мне нужно было бы создать Flux, который периодически излучает (с максимальным тайм-аутом периода), имея элемент без результата, когда репозиторий возвращает пустой Mono, или фактическое значение, когда оно есть в репозитории te, завершение Флюс.

1 Ответ

0 голосов
/ 05 июня 2020

Решение этой проблемы с использованием RSocket с моделью взаимодействия RequestResponse, ожидающей, пока ресурс не станет доступным в БД. Ключ заключался в использовании оператора repeatWhenEmpty:

    @MessageMapping("request-resource")
    fun getResourceWebSocket(resourceRequest: ResourceRequest): Mono<Resource> {
        return resourceService.sendResourceRequestProcessing(resourceRequest)
    }

    override fun sendResourceRequestMessage(resourceRequest: ResourceRequest): Mono<Resource> {
        val resourceId = randomUUID().toString()
        return Mono.fromCallable {
            sendKafkaResourceProcessingRequestMessage(resourceId, resourceRequest)
        }.then(poolResourceResponse(resourceId))
    }
    private fun poolResourceResponse(resourceId: String): Mono<Resource> {
        return resourceRepository.findByResourceId(resourceId)
                .repeatWhenEmpty(30) { longFlux ->
                    longFlux.delayElements(Duration.ofSeconds(1))
                            .doOnNext { logger.info("Repeating {}", it) }
                }
    }
...