с пружинным чехлом rsocket захватывает тип рамки отмены - PullRequest
1 голос
/ 09 марта 2020

У меня есть реализация rsocket с пружинной загрузкой, где, если клиент отменяет или закрывает свой запрос rsocket, я хочу отменить другие регистрации подписки на сервере.

В журналах на сервере весенней загрузки я вижу, что сообщение об отмене отправлено или получено:

WARN i.r.t.n.s.WebsocketServerTransport$1 [reactor-http-nio-3] received WebSocket Close Frame - connection is closing
INFO r.u.Loggers$Slf4JLogger [reactor-http-nio-3] cancel()

Как мне захватить и обработать этот сигнал отмены?

Я пытался отменить конечные точки, но они не фиксируют сигнал:

@MessageMapping("cancel")
Flux<Object> onCancel() {
    log.info("Captured cancel signal");
}

или

@ConnectMapping("cancel")
Flux<Object> onCancel2() {
    log.info("Captured cancel2 signal");
}

Этот вопрос по отмене подписок возможно связан, и этот вопрос по обнаружению отключения веб-розетки

Ответы [ 2 ]

1 голос
/ 30 апреля 2020

Это не был хорошо поставленный вопрос. Ответ в том, что

INFO r.u.Loggers$Slf4JLogger [reactor-http-nio-3] cancel()

виден FluxSink, который был установлен с исходной конечной точки @MessageMapping.

Например:

@MessageMapping("hello")
Flux<Object> hello(@Payload String message) {       
    return myService.generateWorld(message);
}

В myService классе

public Flux<Object> generateWorld(String hello) {
    EmitterProcessor<Object> emitter = EmitterProcessor.create();
    FluxSink<Object> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);

    // doing stuff with sink here
    sink.next(stuff());

    // This part will handle a cancel from the client
    sink.onCancel(() -> {log.info("********** SINK.onCancel ***********");});

    return Flux.from(emitter));  
}

sink.onCancel() будет обрабатывать отмену потока к конечной точке hello от клиента.

1 голос
/ 30 апреля 2020

Для захвата сигнала отмены вы можете использовать подписку на событие onClose().

В вашем контроллере

@Controller
class RSocketConnectionController {

    @ConnectMapping("client-id")
    fun onConnect(rSocketRequester: RSocketRequester, clientId: String) {
//        rSocketRequester.rsocket().dispose()   //to reject connection
        rSocketRequester
                .rsocket()
                .onClose()
                .subscribe(null, null, {
                    log.info("{} just disconnected", clientId)

                    //TODO here whatever you want
                })
    }
}

Ваш клиент должен правильно отправить фрейм SETUP, чтобы вызвать это @ConnectMapping. Если вы используете rsocket-js, вам нужно добавить такие данные:

const client = new RSocketClient({
        // send/receive JSON objects instead of strings/buffers
        serializers: {
          data: JsonSerializer,
          metadata: IdentitySerializer
        },
        setup: {
          //for connection mapping on server
          payload: {
            data: 'unique-client-id',   //TODO you can receive this data on server side
            metadata: String.fromCharCode("client-id".length) + "client-id"
          },
          // ms btw sending keepalive to server
          keepAlive: 60000,
.....
        }
});

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...