Как я могу вернуть другой ServerResponse, если данные потока Flux имеют статус ошибки - PullRequest
0 голосов
/ 17 октября 2019

Я реализую конечную точку REST с помощью WebFlux, и у меня нет проблем с базовыми операциями, но есть операция, которой я не знаю, как управлять. Я хотел бы вернуть объекты PriceMessage, связанные с объектом Event, поэтому, если событие существует, конечная точка возвращает ServerResponse.ok (), но ServerResponse.notFound () должен быть возвращен, если событие не существует.

InНа уровне хранилища существует следующий метод, который возвращает Mono, представляющий код, если и Event с eventId существует, и Mono.empty (), если его нет:

@Override
public Mono<Integer> getSportsEventId(long eventId) {
  Optional<SourceEventDto> optionalSourceEvent =
    springJpaSourceEventRepository.findByEventId(eventId);

  Mono<Integer> result = Mono.empty();

  if (optionalSourceEvent.isPresent()) {
    result = Mono.just(new Integer(optionalSourceEvent.get().getSourceId()));
  }

  return result;
}

Затем уровень обслуживания возвращает Flux. которая устанавливает ошибку, если событие не было найдено, или подключается к Redis для получения цен, связанных с sportsEventId:

@Override
public Flux<PriceMessage> getPrices(long eventId) {
  return
    // get the sportsEventId
    sourceEventRepository.getSportsEventId(eventId)
      // notify the event does not exist
      .switchIfEmpty(Mono.error(new IllegalStateException("Event " + eventId + " does not exist")))
      // get the related PriceEntity objects
      .flatMapMany(priceRepository::findBySporsEventId)
      // transform to PriceMessage
      .map(priceMessageFactory::from);
}

Этот код проверен модулем, и я также отладил его, чтобы убедиться, что flatMapMany и части картыне вызывается, если в потоке установлена ​​ошибка.

Наконец, на уровне REST есть этот код, счастливый путь:

public Mono<ServerResponse> getPricesByEventId(ServerRequest request) {
  String eventIdParam = request.pathVariable("eventId");

  // call the service layer
  Flux<PriceMessage> prices = priceService.getPrices(eventId);

  return
    ServerResponse.ok()
      .contentType(MediaType.APPLICATION_STREAM_JSON)
      .body(prices, PriceMessage.class);
}

Это прекрасно работает, но я не знаю, каквернуть ошибку 404, если поток цен содержит ошибку, потому что событие не было найдено, установив в теле сообщение об ошибке, что-то вроде этого:

ServerResponse.status(HttpStatus.NOT_FOUND)
  .contentType(MediaType.APPLICATION_JSON_UTF8)
  .syncBody(<error_message_from_flux>);

Псевдокод будет примерно таким, но я не могу перевести его в реактивный:

if (!prices.isError()) then
  return
    ServerResponse.ok()
      .contentType(MediaType.APPLICATION_STREAM_JSON)
      .body(prices, PriceMessage.class);
else
  return
    ServerResponse.status(HttpStatus.NOT_FOUND)
     .contentType(MediaType.APPLICATION_JSON_UTF8)
     .syncBody(<error_message_from_flux>);
end

Любая помощь будет принята с благодарностью.

Ответы [ 2 ]

2 голосов
/ 17 октября 2019

Вы можете использовать onErrorResume для создания ответа об ошибке в случае возникновения ошибки.

 return priceService.getPrices(eventId).flatMap(prices -> ServerResponse.ok()
          .contentType(MediaType.APPLICATION_STREAM_JSON)
          .body(prices, PriceMessage.class))
       .onErrorResume(err -> ServerResponse.status(HttpStatus.NOT_FOUND).body({Your Error Body Here}).build());
0 голосов
/ 17 октября 2019

Вы можете использовать выбор для обработки этого при регистрации маршрутов.

@Bean
public RouterFunction<ServerResponse> route(PriceMessageHandler handler) {
  return RouterFunctions.route().GET("/",
      request -> ServerResponse.ok().body(handler.handle(request), PriceMessage.class))
      .onError(Exception.class, (e, a) -> ServerResponse.status(HttpStatus.NOT_FOUND).build())
      .build();
}

Или вы можете использовать onErrorResume на ServerResponse:

return ServerResponse.ok().body(priceService.getPrices(eventId), PriceMessage.class).onErrorResume((e) -> ServerResponse.notFound().build());

Надеюсь, это поможет.

...