Обслуживание разных конечных точек webflux в разных циклах событий - PullRequest
0 голосов
/ 11 февраля 2020

Я перенес большую базу кода Spring-Web в Spring-WebFlux. Есть два набора конечных точек, некоторые критические конечные точки, которые должны быть быстрыми, и некоторые другие, которые не являются жизненно важными, если они в конечном итоге блокируют работу. Я могу подтвердить, что ни одна из критических конечных точек не блокируется в потоке eventl oop, но проверка всех остальных конечных точек невозможна. Я хотел бы запустить их в другом пуле потоков или eventl oop, поэтому, если они блокируются, они не влияют на критические конечные точки.

Как бы я go об этом?

Я попытался использовать рефлексию, чтобы найти все обработчики, которые должны или не должны быть на вторичном событии l oop, но не могу придумать способ назначить их ему. В качестве альтернативы, возможно ли запускать как Spring-Web, так и Spring-WebFlux, и заставить Spring-Web обслуживать некритические конечные точки?

Мне удалось получить обработчики из реестра обработчиков, например, так:

@Component
class HandlerFinder(val requestMappingHandlerMappings: List<RequestMappingHandlerMapping>) {

    @PostConstruct
    fun start() {
        for (rmhm : RequestMappingHandlerMapping in requestMappingHandlerMappings) {
            val handlerMethods: Map<RequestMappingInfo, HandlerMethod> = rmhm.handlerMethods

            for ((mapping, method) in handlerMethods) {
                for (urlPattern in mapping.patternsCondition.patterns) {
                    log.info("${method.beanType.name}#${method.method.name} <-- ${urlPattern}")
                }
            }
        }
    }

    companion object {
        private val log = LoggerFactory.getLogger(HandlerFinder::class.java)
    }
}

Я думаю, что следующим шагом будет как-то изменить обработчики, чтобы они были обернуты во что-то вроде ниже, но я не уверен, как изменить это в реестре обработчиков:

Mono.fromCallable(() -> { ... handler method goes here ...}).publishOn(Schedulers.elastic());

Я знаю, что могу отменить регистрацию с помощью rmhm.ungreisterMapping(mapping), а затем заново зарегистрировать его с чем-то вроде rmhm.registerMapping(mapping, method.bean, method.method), но мне нужно будет создать новые методы, чтобы каким-то образом обернуть существующие.

Ответы [ 2 ]

1 голос
/ 13 февраля 2020

я бы предложил взять ваши критические конечные точки, которые вы не хотите блокировать, и установить subscribeOn

в качестве документации состояний для subscribeOn

Изменяет поток, из которого подписывается вся цепочка операторов

Ваш выбор publishOn только изменит контекст с того места, где он был объявлен, и вперед. subscribeOn установит полную подписку на выделенный планировщик.

Таким образом, вы можете разделить то, как вы хотите, чтобы ваши конечные точки обрабатывались внутри разных планировщиков внутри приложения.

0 голосов
/ 17 февраля 2020

Я решил это, создав WebFilter и изменив subscribeOn, как предполагает ответ Томаса.

@Component
class SchedulerWebFilter(): WebFilter {
    val executorService = Executors.newFixedThreadPool(32)
    private val pathsWhitelist: Set<String> = ...

    override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
        val path = exchange.request.path.toString()
        return if (!pathsWhitelist.contains(path)) {
            chain.filter(exchange).subscribeOn(Schedulers.fromExecutor(executorService))
        } else {
            chain.filter(exchange)
        }
    }
}
...