Как сообщения передаются подписчикам в ConnectableFlux - PullRequest
1 голос
/ 13 февраля 2020

Я новичок в Spring WebFlux и нашел эту демонстрацию чата с использованием Spring WebFlux и Websocket по этой ссылке:

https://blog.monkey.codes/how-to-build-a-chat-app-using-webflux-websockets-react/

Основываясь на статье, он сказал:

Суть приложения заключается в соединении WebSocketSessions друг с другом. Это достигается путем подключения потока входящих сообщений каждого сеанса к глобальному издателю. С другой стороны, каждый сеанс подписывается на сообщения, созданные глобальным издателем.

Конфигурация Reactor:

    @Bean
    public UnicastProcessor<Event> eventPublisher(){
        return UnicastProcessor.create();
    }

    @Bean
    public Flux<Event> events(UnicastProcessor<Event> eventPublisher) {
        return eventPublisher
                .replay(25)
                .autoConnect();
    }

    @Bean
    public HandlerMapping webSocketMapping(UnicastProcessor<Event> eventPublisher, Flux<Event> events) {
        Map<String, Object> map = new HashMap<>();
        map.put("/websocket/chat", new ChatSocketHandler(eventPublisher, events));
        SimpleUrlHandlerMapping simpleUrlHandlerMapping = new SimpleUrlHandlerMapping();
        simpleUrlHandlerMapping.setUrlMap(map);

        //Without the order things break :-/
        simpleUrlHandlerMapping.setOrder(10);
        return simpleUrlHandlerMapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }

Обработчик сокетов чата:

@Override
public Mono<Void> handle(WebSocketSession session) {
    WebSocketMessageSubscriber subscriber = new WebSocketMessageSubscriber(eventPublisher);
    session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .map(this::toEvent)
            .subscribe(subscriber::onNext, subscriber::onError, subscriber::onComplete);
    return session.send(outputEvents.map(session::textMessage));
}

private static class WebSocketMessageSubscriber {
    private UnicastProcessor<Event> eventPublisher;
    private Optional<Event> lastReceivedEvent = Optional.empty();

    public WebSocketMessageSubscriber(UnicastProcessor<Event> eventPublisher) {
        this.eventPublisher = eventPublisher;
    }

    public void onNext(Event event) {
        lastReceivedEvent = Optional.of(event);
        eventPublisher.onNext(event);
    }

    public void onError(Throwable error) {
        //TODO log error
        error.printStackTrace();
    }

    public void onComplete() {
        lastReceivedEvent.ifPresent(event -> eventPublisher.onNext(
                Event.type(USER_LEFT)
                        .withPayload()
                        .user(event.getUser())
                        .build()));
    }

}

Если один пользователь отправляет сообщение, то другие пользователи могут видеть это, потому что оно передается всем пользователям. Но когда я читаю код, я не понимаю, как ConnectableFlux (события) отправляет сообщение всем сеансам веб-сокетов. Я также попытался прокомментировать код класса UserStats, но он все еще работает нормально. Кто-нибудь может мне это объяснить?

...