У меня есть это в моей WebSocketHandler
реализации:
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(
session.receive()
.flatMap(webSocketMessage -> {
int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
var publisher = flux
.<String>handle((o, sink) -> {
try {
sink.next(objectMapper.writeValueAsString(o));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
})
.map(session::textMessage);
return publisher;
})
);
}
Flux<EfficiencyData>
в настоящее время генерируется для тестирования в службе следующим образом:
public Flux<EfficiencyData> subscribeToEfficiencyData(long weavingLoomId) {
return Flux.interval(Duration.ofSeconds(1))
.map(aLong -> {
longAdder.increment();
return new EfficiencyData(new MachineSpeed(
RotationSpeed.ofRpm(longAdder.intValue()),
RotationSpeed.ofRpm(0),
RotationSpeed.ofRpm(400)));
}).publish().autoConnect();
}
Я использую publish().autoConnect()
чтобы сделать это горячим потоком.Я создал модульный тест, который запускает 2 потока, которые делают это на возвращенном Flux
:
flux.log().handle((s, sink) -> {
LOGGER.info("{}", s.getMachineSpeed().getCurrent());
}).subscribe();
В этом случае я вижу, как оба потока выводят одно и то же значение каждую секунду.
Однако, когда я открываю 2 вкладки браузера, я не вижу одинаковые значения на обеих моих веб-страницах.Чем больше клиентов веб-сокетов подключаются, тем больше значений друг от друга (поэтому каждое значение из исходного Flux, похоже, отправляется другому клиенту, а не всем).