Реактивные веб-сокеты Spring 5: клиенты не получают одинаковые данные из горячего потока - PullRequest
1 голос
/ 06 марта 2019

У меня есть это в моей WebSocketHandler реализации:

@Override
public Mono<Void> handle(WebSocketSession session) {

    return session.send(
       session.receive()
              .flatMap(webSocketMessage -> {
                  int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

                  Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
                  var publisher = flux
                      .<String>handle((o, sink) -> {
                         try {
                            sink.next(objectMapper.writeValueAsString(o));
                         } catch (JsonProcessingException e) {
                            e.printStackTrace();                               
                         }
                      })
                      .map(session::textMessage);

                  return publisher;
              })
    );
}

Flux<EfficiencyData> в настоящее время генерируется для тестирования в службе следующим образом:

public Flux<EfficiencyData> subscribeToEfficiencyData(long weavingLoomId) {
    return Flux.interval(Duration.ofSeconds(1))
               .map(aLong -> {
                   longAdder.increment();
                   return new EfficiencyData(new MachineSpeed(
                           RotationSpeed.ofRpm(longAdder.intValue()),
                           RotationSpeed.ofRpm(0),
                           RotationSpeed.ofRpm(400)));
               }).publish().autoConnect();
}

Я использую publish().autoConnect() чтобы сделать это горячим потоком.Я создал модульный тест, который запускает 2 потока, которые делают это на возвращенном Flux:

flux.log().handle((s, sink) -> {
            LOGGER.info("{}", s.getMachineSpeed().getCurrent());
        }).subscribe();

В этом случае я вижу, как оба потока выводят одно и то же значение каждую секунду.

Однако, когда я открываю 2 вкладки браузера, я не вижу одинаковые значения на обеих моих веб-страницах.Чем больше клиентов веб-сокетов подключаются, тем больше значений друг от друга (поэтому каждое значение из исходного Flux, похоже, отправляется другому клиенту, а не всем).

1 Ответ

2 голосов
/ 07 марта 2019

удалось исправить это благодаря Брайану Клозелу в твиттере .

Проблема в том, что для каждого подключающегося клиента веб-сокета я вызываю метод service.subscribeToEfficiencyData(id), который возвращает новый Flux каждый раз, когда он вызывается. Поэтому, конечно, эти независимые Flux'ы не разделяются между различными клиентами веб-сокетов.

Чтобы устранить эту проблему, я создаю экземпляр Flux в конструкторе или метод PostConstruct моего сервиса, поэтому subscribeToEfficiencyData каждый раз возвращает один и тот же экземпляр Flux.

Обратите внимание, что .publish().autoConnect() на Flux остается важным, потому что без этого клиенты websocket снова увидят другие значения!

...