Как протолкнуть данные через реактивную веб-сокет с помощью Spring в ответ на запрос? - PullRequest
1 голос
/ 06 марта 2019

Я начинаю с реактивных веб-сокетов, используя Spring Boot 2.1.3. Я создал WebSocketHandler реализацию, подобную этой:

@Override
public Mono<Void> handle(WebSocketSession session) {

Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(1);
    var publisher = flux.map( o -> {
        try {
            return objectMapper.writeValueAsString(o);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null;
        }
    }).map(session::textMessage)
      .delayElements(Duration.ofSeconds(1));
    return session.send(publisher);
}

Это работает, если я подключаюсь, я сериализуюсь EfficiencyData каждую секунду в моем клиенте websocket.

Однако я хочу отреагировать на запрос, поступающий из веб-сокета, чтобы сообщить service, для какого идентификатора мне нужны данные. Мне удалось получить информацию запроса следующим образом:

@Override
public Mono<Void> handle(WebSocketSession session) {

    return session.send(session.receive().map(webSocketMessage -> {
        int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

        return session.textMessage("Subscribing with id " + id);
    }));

Теперь я понятия не имею, как объединить эти 2 реализации?

Я надеялся сделать что-то вроде этого:

@Override
public Mono<Void> handle(WebSocketSession session) {

    return session.send(session.receive().map(webSocketMessage -> {
        int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

        Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
        var publisher = flux.map( o -> {
            try {
                return objectMapper.writeValueAsString(o);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
                return null;
            }
        }).map(session::textMessage)
                            .delayElements(Duration.ofSeconds(1));
        return publisher; //Does not compile
    }));

Но это не компилируется, поскольку publisher является Flux<WebSocketMessage> и должно быть Publisher<WebSocketMessage>. Как с этим обращаться?

EDIT:

Следуя примеру Javadoc WebSocketHandler, я попробовал это:

@Override
public Mono<Void> handle(WebSocketSession session) {
    Flux<EfficiencyData> flux =
            session.receive()
                   .map(webSocketMessage -> Integer.parseInt(webSocketMessage.getPayloadAsText()))
                   .concatMap(service::subscribeToEfficiencyData);
    Mono<Void> input = flux.then();
    Mono<Void> output = session.send(flux.map(data -> session.textMessage(data.toString()))).then();
    return Mono.zip(input, output).then();
}

Но это просто немедленно отключает клиента websocket, ничего не делая.

1 Ответ

0 голосов
/ 06 марта 2019

Используйте flatMap или concatMap, чтобы сгладить возвращаемое издательство.

. Чтобы устранить проблему, вы должны использовать операторы, которые позволяют выравнивать возвращаемое значение.Например,

@Override
public Mono<Void> handle(WebSocketSession session) {

    return session.send(
       session.receive()
              .flatMap(webSocketMessage -> {
                  int id = Integer.parseInt(webSocketMessage.getPayloadAsText());

                  Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
                  var publisher = flux
                      .<String>handle((o, sink) -> {
                         try {
                            sink.next(objectMapper.writeValueAsString(o));
                         } catch (JsonProcessingException e) {
                            e.printStackTrace();
                            return; // null is prohibited in reactive-streams
                         }
                      })
                      .map(session::textMessage)
                      .delayElements(Duration.ofSeconds(1));

                  return publisher;
              })
    );
}

Ключевые выводы

  1. Если тип возвращаемого значения - поток, используйте flatMap или concatMap (см. Разницу здесь
  2. Никогда не возвращает Null. В реактивных потоках Null является запрещенным значением (см. Правила спецификации здесь
  3. Когда отображение может закончиться null -> usehandle оператор. Подробнее см. здесь
...