springboot2 + webflux + websocket - PullRequest
       13

springboot2 + webflux + websocket

0 голосов
/ 18 января 2019

Я использую Spring boot 2 с Webflux на JDK 11. Я написал следующий класс конфигурации:

@Configuration
public class WebSocketConfiguration {

    @Autowired
    @Bean
    public HandlerMapping webSocketMapping(final MyWebSocketHandler server) {
        final Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/echo", server);

        final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
        mapping.setUrlMap(map);
        return mapping;
    }
    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

И следующий WebSocketHandler метод:

@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
    return webSocketSession.send(webSocketSession.receive().
            map(msg -> webSocketSession
                    .textMessage("response:jack ->" + msg.getPayloadAsText())));
}

Теперь я могу получить все, что отправлю, например:

отправка клиента: 4545

клиент получает: ответ : jack -> 4545

Я хочу знать, как я могу отправить сообщение клиенту, когда клиент не отправляет мне сообщение, мне нужны push-сообщения в любое время 101

Как мне отправить пользовательское сообщение в любое время, а не отвечать тем же входным сообщением?

1 Ответ

0 голосов
/ 09 августа 2019

Вы можете прочитать об этом в статье моего блога http://kojotdev.com/2019/08/spring-webflux-websocket-with-vue-js/.

Вам необходимо изменить WebSocketHandler на:

private final GreetingsPublisher greetingsPublisher;
private final Flux<String> publisher;

public ReactiveWebSocketHandler(GreetingsPublisher greetingsPublisher) {
    this.greetingsPublisher = greetingsPublisher;
    this.publisher = Flux.create(greetingsPublisher).share();
}

@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
    final Flux<WebSocketMessage> message = publisher
            .map(greetings -> webSocketSession.textMessage(greetings));

    return webSocketSession.send(message);
}

И добавить GreetingPublisher

@Component
public class GreetingsPublisher implements Consumer<FluxSink<String>> {
    private static final Logger log = LoggerFactory.getLogger(GreetingsPublisher.class);

    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
    private final Executor executor = Executors.newSingleThreadExecutor();

    public boolean push(String greeting) {
        return queue.offer(greeting);
    }

    @Override
    public void accept(FluxSink<String> sink) {
        this.executor.execute(() -> {
            while (true) {
                Try.of(() -> {
                    final String greeting = queue.take();
                    return sink.next(greeting);
                })
                        .onFailure(ex -> log.error("Could not take greeting from queue", ex));

            }
        });
    }
}

Это bean-компонент, поэтому, куда бы вы ни внедрили его и вызвали метод push, он отправит сообщение с помощью WebSocket. Например:

@Controller
public class GreetingsController {

    private final GreetingsPublisher greetingsPublisher;

    public GreetingsController(GreetingsPublisher greetingsPublisher) {
        this.greetingsPublisher = greetingsPublisher;
    }

    @Bean
    RouterFunction<ServerResponse> pushMessage() {
        return route(GET("/push"),
                request -> {
                    greetingsPublisher.push("Send a new message with WebSocket");
                    return ServerResponse.ok().body(fromObject("websocket message sent"));
                });
    }
}

Сначала соединитесь с WebSocket и откройте браузер на локальном хосте: 8080 / push. Сообщение должно быть отправлено.

Обратите внимание, что это похоже на ошибку в Spring Boot 2.1.7, и я упоминал об этом в своем блоге.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...