Spring Boot Reactive Websoket - блокируйте поток, пока не получите всю информацию от клиента - PullRequest
0 голосов
/ 08 июня 2018

Я играю с реактивным веб-сокетом (при весенней загрузке 2.1.0), но у меня есть проблема при попытке заблокировать поток, ожидающий информацию о клиенте.

Я знаю, что блокировка - это неправильный способдля обработки реактивного потока, но мне нужно получить некоторую информацию от клиента перед продолжением (то есть: ключ авторизации, идентификатор), есть приемлемый способ управлять этим реактивным способом?

Например:Я хочу, чтобы клиент отправил ключ авторизации и идентификатор подписки (чтобы подписаться только на определенное событие), и я отправлю поток рассылки только тогда, когда у меня есть обе данные, или закрою сеанс, если информация недействительна

Я пытался управлять проверкой внутри метода handle

     webSocketSession.receive().subscribe(inMsg -> {

            if(!inMsg.getPayloadAsText().equals("test-key")) {
                log.info("AUTHORIZATION ERROR");
                webSocketSession.close();
            }
        });

Но этот способ не работает и не корректен, поскольку управляет уничтожением сеанса асинхронным способом и в любом случае также при получениисообщение с неверным ключом сеанс все еще остается живым

Другой способ - сохранить «сеанс» с использованием встроенной памятичтобы отслеживать полученную информацию и обрабатывать ее на уровне бизнес-логики

Я застрял в поиске правильного способа управления ею реагирующим образом

Моя отправная точка была в следующем примере: http://www.baeldung.com/spring-5-reactive-websockets

Заранее спасибо

ДОПОЛНИТЕЛЬНАЯ ИНФОРМАЦИЯ:

Используя пример https://github.com/eugenp/tutorials/tree/master/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket

Я создал базовое приложение для весенней загрузки, и я 'Мы добавили класс конфигурации для веб-сокета:

@Configuration 
class WebConfig {

    @Bean
    public HandlerMapping handlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/event-emitter-test", new MyWebSocketHandler());

        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        mapping.setOrder(-1); // before annotated controllers
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

Ниже приведен основной класс, содержащий основной метод веб-сокета (измененный с использованием моего фактического кода):

@Component
public class MyWebSocketHandler  implements WebSocketHandler {

    @Autowired
    private WebSocketHandler webSocketHandler;

    @Bean
    public HandlerMapping webSocketHandlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/event-emitter-test", webSocketHandler);

        SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
        handlerMapping.setOrder(1);
        handlerMapping.setUrlMap(map);
        return handlerMapping;
    }

    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {

       webSocketSession.receive().subscribe(inMsg -> {

        if(!inMsg.getPayloadAsText().equals("test-key")) {
            // log.info("AUTHORIZATION ERROR");
            webSocketSession.close();
        }
    });

    List<String> data = new ArrayList<String>(Arrays.asList("{A}", "{B}", "{C}"));
    Flux<String> intervalFlux = Flux
                                .interval(Duration.ofMillis(500))
                                .map(tick -> {
                                    if (tick < data.size())
                                        return "item " + tick + ": " + data.get(tick.intValue());
                                    return "Done (tick == data.size())";
                                });

        return webSocketSession.send(intervalFlux
          .map(webSocketSession::textMessage));
    }


}

1 Ответ

0 голосов
/ 08 июня 2018

Вы не должны subscribe или block в реактивном конвейере - вы можете обнаружить, что находитесь внутри такого конвейера, потому что тип возвращаемого значения метода-обработчика - Mono<Void>, что означает сигнал о том, что обработкавходящие сообщения готовы.

В вашем случае вы, вероятно, захотите прочитать первое сообщение, проверить, содержит ли оно информацию о подписке, которую вы ожидаете, и отправить сообщения.

public class TestWebSocketHandler implements WebSocketHandler {

    public Mono<Void> handle(WebSocketSession session) {

        return session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .flatMap(msg -> {
                    SubscriptionInfo info = extract(msg);
                    if (info == null) {
                        return session.close();
                    }
                    else {
                        Mono<WebSocketMessage> message = Mono.just(session.textMessage("message"));
                        return session.send(message);
                    }
                })
                .then();
    }

    SubscriptionInfo extract(String message) {
        //
    }

    class SubscriptionInfo {
        //
    }
}
...