Я новичок в Spring WebFlux и нашел эту демонстрацию чата с использованием Spring WebFlux и Websocket по этой ссылке:
https://blog.monkey.codes/how-to-build-a-chat-app-using-webflux-websockets-react/
Основываясь на статье, он сказал:
Суть приложения заключается в соединении WebSocketSessions друг с другом. Это достигается путем подключения потока входящих сообщений каждого сеанса к глобальному издателю. С другой стороны, каждый сеанс подписывается на сообщения, созданные глобальным издателем.
Конфигурация Reactor:
@Bean
public UnicastProcessor<Event> eventPublisher(){
return UnicastProcessor.create();
}
@Bean
public Flux<Event> events(UnicastProcessor<Event> eventPublisher) {
return eventPublisher
.replay(25)
.autoConnect();
}
@Bean
public HandlerMapping webSocketMapping(UnicastProcessor<Event> eventPublisher, Flux<Event> events) {
Map<String, Object> map = new HashMap<>();
map.put("/websocket/chat", new ChatSocketHandler(eventPublisher, events));
SimpleUrlHandlerMapping simpleUrlHandlerMapping = new SimpleUrlHandlerMapping();
simpleUrlHandlerMapping.setUrlMap(map);
//Without the order things break :-/
simpleUrlHandlerMapping.setOrder(10);
return simpleUrlHandlerMapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
Обработчик сокетов чата:
@Override
public Mono<Void> handle(WebSocketSession session) {
WebSocketMessageSubscriber subscriber = new WebSocketMessageSubscriber(eventPublisher);
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(this::toEvent)
.subscribe(subscriber::onNext, subscriber::onError, subscriber::onComplete);
return session.send(outputEvents.map(session::textMessage));
}
private static class WebSocketMessageSubscriber {
private UnicastProcessor<Event> eventPublisher;
private Optional<Event> lastReceivedEvent = Optional.empty();
public WebSocketMessageSubscriber(UnicastProcessor<Event> eventPublisher) {
this.eventPublisher = eventPublisher;
}
public void onNext(Event event) {
lastReceivedEvent = Optional.of(event);
eventPublisher.onNext(event);
}
public void onError(Throwable error) {
//TODO log error
error.printStackTrace();
}
public void onComplete() {
lastReceivedEvent.ifPresent(event -> eventPublisher.onNext(
Event.type(USER_LEFT)
.withPayload()
.user(event.getUser())
.build()));
}
}
Если один пользователь отправляет сообщение, то другие пользователи могут видеть это, потому что оно передается всем пользователям. Но когда я читаю код, я не понимаю, как ConnectableFlux (события) отправляет сообщение всем сеансам веб-сокетов. Я также попытался прокомментировать код класса UserStats, но он все еще работает нормально. Кто-нибудь может мне это объяснить?