Spring Webflux Websocket закрывается до завершения onErrorResume logi c - PullRequest
0 голосов
/ 03 февраля 2020

У меня есть простой сервер, который обрабатывает некоторые очень простые c сообщения и возвращает сообщение об ошибке, если есть проблема. Однако, когда возвращается сообщение об ошибке mono, соединение закрывается до того, как логика onErrorResume c может быть завершена (я вижу, что соединение закрывается до регистрации onErrorResume).

Вот код в вопрос, где session является WebSocketSession:

session.send(
    session.receive()
        .map(messageConverter::read)
        .flatMap { messageHandlerStrategy.handle(it, session) }
        .onErrorResume {
            logger.error(it) { "Error happened handling message from client with the message" }
            if (it.message != null) {
                AlertMessage(it.message!!)
            } else {
                AlertMessage("Unknown error occurred")
            }.toMono()
        }
        .filter {
            logger.info { "Filtering if is Message: ${it is Message<Any>}" }
            it is Message<Any>
        }
        .cast(Message::class.java)
        .map { messageConverter.write(session, it) }
        .doFinally { logger.info { "${session.id} CLOSE." } }
        .log()
)

Так что, если messageHandlerStrategy возвращает Mono.error, ожидается, что оно будет заключено в AlertMessage и возвращено.

Для ясности я использую Kotlin с Spring Boot 2.2.4.RELEASE и позволяю ему вводить правильные версии всего (например, reactor-core-3.3.2.RELEASE).

1 Ответ

0 голосов
/ 04 февраля 2020

Я понял, что был глуп. Когда Mono.error возвращается из messageHandlerStrategy.handle, onErrorResume завершает издателя, на которого он подписан, и создает нового издателя для всего остального, на которое можно подписаться. Итак, издатель session.receive завершается, поэтому закрывает сессию. Я просто обновил messageHandlerStrategy.handle, чтобы вернуть соответствующий Mono из AlertMessage для последующей отправки. Не уверен, как это было бы возможно, если бы у меня не было контроля над классом-нарушителем.

Для справки, я понял это из JavaDo c (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#onErrorResume - java. util.function.Function- ), в частности, там, где говорится: «Подписаться на резервный издатель при возникновении ошибки, соответствующей данному типу, используя функцию для выбора резервного варианта в зависимости от ошибки». и на диаграмме показано, что используется новый издатель.

Редактировать: Альтернативой является установка onErrorResume внутри flatMap (поэтому он подписан на издателя messageHandlerStrategy.handle). Тогда это выглядит так:

session.send(
    session.receive()
        .map(messageConverter::read)
        .flatMap { 
            messageHandlerStrategy.handle(it, session)
                .onErrorResume { throwable ->
                    logger.error(throwable) { "Error happened handling message from client with the message" }
                    if (throwable.message != null) {
                        AlertMessage(throwable.message!!)
                    } else {
                        AlertMessage("Unknown error occurred")
                    }.toMono()
                }
        }
        .filter {
            logger.info { "Filtering if is Message: ${it is Message<Any>}" }
            it is Message<Any>
        }
        .cast(Message::class.java)
        .map { messageConverter.write(session, it) }
        .doFinally { logger.info { "${session.id} CLOSE." } }
        .log()
)
...