Вы можете прочитать об этом в статье моего блога http://kojotdev.com/2019/08/spring-webflux-websocket-with-vue-js/.
Вам необходимо изменить WebSocketHandler
на:
private final GreetingsPublisher greetingsPublisher;
private final Flux<String> publisher;
public ReactiveWebSocketHandler(GreetingsPublisher greetingsPublisher) {
this.greetingsPublisher = greetingsPublisher;
this.publisher = Flux.create(greetingsPublisher).share();
}
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
final Flux<WebSocketMessage> message = publisher
.map(greetings -> webSocketSession.textMessage(greetings));
return webSocketSession.send(message);
}
И добавить GreetingPublisher
@Component
public class GreetingsPublisher implements Consumer<FluxSink<String>> {
private static final Logger log = LoggerFactory.getLogger(GreetingsPublisher.class);
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
private final Executor executor = Executors.newSingleThreadExecutor();
public boolean push(String greeting) {
return queue.offer(greeting);
}
@Override
public void accept(FluxSink<String> sink) {
this.executor.execute(() -> {
while (true) {
Try.of(() -> {
final String greeting = queue.take();
return sink.next(greeting);
})
.onFailure(ex -> log.error("Could not take greeting from queue", ex));
}
});
}
}
Это bean-компонент, поэтому, куда бы вы ни внедрили его и вызвали метод push, он отправит сообщение с помощью WebSocket. Например:
@Controller
public class GreetingsController {
private final GreetingsPublisher greetingsPublisher;
public GreetingsController(GreetingsPublisher greetingsPublisher) {
this.greetingsPublisher = greetingsPublisher;
}
@Bean
RouterFunction<ServerResponse> pushMessage() {
return route(GET("/push"),
request -> {
greetingsPublisher.push("Send a new message with WebSocket");
return ServerResponse.ok().body(fromObject("websocket message sent"));
});
}
}
Сначала соединитесь с WebSocket и откройте браузер на локальном хосте: 8080 / push. Сообщение должно быть отправлено.
Обратите внимание, что это похоже на ошибку в Spring Boot 2.1.7, и я упоминал об этом в своем блоге.