Контроллер Webflux возвращает объект вместо Mono - PullRequest
0 голосов
/ 06 августа 2020

Здравствуйте, я новичок в Webflux. Я следую руководству по созданию реактивных микросервисов. В моем проекте я столкнулся со следующей проблемой.

Я хочу создать crud api для службы продукта, и следующий метод Create

@Override
public Product createProduct(Product product) {

    Optional<ProductEntity> productEntity = Optional.ofNullable(repository.findByProductId(product.getProductId()).block());
             productEntity.ifPresent((prod -> {
                  throw new InvalidInputException("Duplicate key, Product Id: " + product.getProductId());
              }));

    ProductEntity entity = mapper.apiToEntity(product);
    Mono<Product> newProduct = repository.save(entity)
        .log()
        .map(mapper::entityToApi);

    return newProduct.block();
}

Проблема в том, что когда я вызываю это метод от почтальона я получаю сообщение об ошибке "block () / blockFirst () / blockLast () блокируются, что не поддерживается в потоке response-http-nio-3" , но когда я использую StreamListener, этот вызов работает нормально. Слушатель потока получает события от канала rabbit-mq.

StreamListener

@EnableBinding(Sink.class)
public class MessageProcessor {

    private final ProductService productService;

    public MessageProcessor(ProductService productService) {
        this.productService = productService;
    }

    @StreamListener(target = Sink.INPUT)
    public void process(Event<Integer, Product> event) {

       
        switch (event.getEventType()) {

            case CREATE:
                Product product = event.getData();
                LOG.info("Create product with ID: {}", product.getProductId());
                productService.createProduct(product);
                break;

   
            default:
                String errorMessage = "Incorrect event type: " + event.getEventType() + ", expected a CREATE or DELETE event";
                LOG.warn(errorMessage);
                throw new EventProcessingException(errorMessage);
        }
    }
}

У меня два вопроса.

  1. Почему это работает с StreamListener, а не с простой запрос?
  2. Есть ли в webflux правильный способ вернуть объект Mono, или мы всегда должны возвращать Mono?

1 Ответ

2 голосов
/ 06 августа 2020

Ваш метод создания хотел бы выглядеть примерно так, и вы хотели бы вернуть Mono<Product> из вашего контроллера, а не только объект.

  public Mono<Product> createProduct(Product product) {
    return repository.findByProductId(product.getProductId())
        .switchIfEmpty(Mono.just(mapper.apiToEntity(product)))
        .flatMap(repository::save)
        .map(mapper::entityToApi);
  }

Как прокомментировал @Thomas, вы нарушаете некоторые из основы реактивного кодирования и невозможность получения преимуществ от использования block (), и следует прочитать об этом подробнее. Например, реактивный репозиторий mon go, который вы используете, будет возвращать Mono, который имеет свои собственные методы для обработки, если он пуст, без необходимости использования Optional, как показано выше.

EDIT для сопоставления с ошибкой, если объект уже существует, в противном случае сохранить

  public Mono<Product> createProduct(Product product) {
    return repository.findByProductId(product.getProductId())
        .hasElement()
        .filter(exists -> exists)
        .flatMap(exists -> Mono.error(new Exception("my exception")))
        .then(Mono.just(mapper.apiToEntity(product)))
        .flatMap(repository::save)
        .map(mapper::entityToApi);
  }
...