Я начинаю с реактивных веб-сокетов, используя Spring Boot 2.1.3. Я создал WebSocketHandler
реализацию, подобную этой:
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(1);
var publisher = flux.map( o -> {
try {
return objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}).map(session::textMessage)
.delayElements(Duration.ofSeconds(1));
return session.send(publisher);
}
Это работает, если я подключаюсь, я сериализуюсь EfficiencyData
каждую секунду в моем клиенте websocket.
Однако я хочу отреагировать на запрос, поступающий из веб-сокета, чтобы сообщить service
, для какого идентификатора мне нужны данные. Мне удалось получить информацию запроса следующим образом:
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(session.receive().map(webSocketMessage -> {
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
return session.textMessage("Subscribing with id " + id);
}));
Теперь я понятия не имею, как объединить эти 2 реализации?
Я надеялся сделать что-то вроде этого:
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(session.receive().map(webSocketMessage -> {
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
var publisher = flux.map( o -> {
try {
return objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}).map(session::textMessage)
.delayElements(Duration.ofSeconds(1));
return publisher; //Does not compile
}));
Но это не компилируется, поскольку publisher
является Flux<WebSocketMessage>
и должно быть Publisher<WebSocketMessage>
. Как с этим обращаться?
EDIT:
Следуя примеру Javadoc WebSocketHandler
, я попробовал это:
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<EfficiencyData> flux =
session.receive()
.map(webSocketMessage -> Integer.parseInt(webSocketMessage.getPayloadAsText()))
.concatMap(service::subscribeToEfficiencyData);
Mono<Void> input = flux.then();
Mono<Void> output = session.send(flux.map(data -> session.textMessage(data.toString()))).then();
return Mono.zip(input, output).then();
}
Но это просто немедленно отключает клиента websocket, ничего не делая.