Я играю с реактивным веб-сокетом (при весенней загрузке 2.1.0), но у меня есть проблема при попытке заблокировать поток, ожидающий информацию о клиенте.
Я знаю, что блокировка - это неправильный способдля обработки реактивного потока, но мне нужно получить некоторую информацию от клиента перед продолжением (то есть: ключ авторизации, идентификатор), есть приемлемый способ управлять этим реактивным способом?
Например:Я хочу, чтобы клиент отправил ключ авторизации и идентификатор подписки (чтобы подписаться только на определенное событие), и я отправлю поток рассылки только тогда, когда у меня есть обе данные, или закрою сеанс, если информация недействительна
Я пытался управлять проверкой внутри метода handle
webSocketSession.receive().subscribe(inMsg -> {
if(!inMsg.getPayloadAsText().equals("test-key")) {
log.info("AUTHORIZATION ERROR");
webSocketSession.close();
}
});
Но этот способ не работает и не корректен, поскольку управляет уничтожением сеанса асинхронным способом и в любом случае также при получениисообщение с неверным ключом сеанс все еще остается живым
Другой способ - сохранить «сеанс» с использованием встроенной памятичтобы отслеживать полученную информацию и обрабатывать ее на уровне бизнес-логики
Я застрял в поиске правильного способа управления ею реагирующим образом
Моя отправная точка была в следующем примере: http://www.baeldung.com/spring-5-reactive-websockets
Заранее спасибо
ДОПОЛНИТЕЛЬНАЯ ИНФОРМАЦИЯ:
Используя пример https://github.com/eugenp/tutorials/tree/master/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket
Я создал базовое приложение для весенней загрузки, и я 'Мы добавили класс конфигурации для веб-сокета:
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/event-emitter-test", new MyWebSocketHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(-1); // before annotated controllers
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
Ниже приведен основной класс, содержащий основной метод веб-сокета (измененный с использованием моего фактического кода):
@Component
public class MyWebSocketHandler implements WebSocketHandler {
@Autowired
private WebSocketHandler webSocketHandler;
@Bean
public HandlerMapping webSocketHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/event-emitter-test", webSocketHandler);
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(1);
handlerMapping.setUrlMap(map);
return handlerMapping;
}
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
webSocketSession.receive().subscribe(inMsg -> {
if(!inMsg.getPayloadAsText().equals("test-key")) {
// log.info("AUTHORIZATION ERROR");
webSocketSession.close();
}
});
List<String> data = new ArrayList<String>(Arrays.asList("{A}", "{B}", "{C}"));
Flux<String> intervalFlux = Flux
.interval(Duration.ofMillis(500))
.map(tick -> {
if (tick < data.size())
return "item " + tick + ": " + data.get(tick.intValue());
return "Done (tick == data.size())";
});
return webSocketSession.send(intervalFlux
.map(webSocketSession::textMessage));
}
}