Ошибка подключения ручки реактора изящно - PullRequest
0 голосов
/ 27 апреля 2020

У меня проблема с обработкой ошибок в реакторе. Мое весеннее загрузочное приложение, слушаю сообщения RabbitMQ и после получения сообщения сохраняю его в MongoDB. Мой вариант использования подходит для конвейера потока и есть фильтр.

как в «фильтре», так и в «подписке», я соединяюсь с пн go. Во время исключения MongoTimeOut, главным образом по истечении времени ожидания соединения, я хочу отправить сообщение «NACK». Это требует использования объекта Channel.

Вопрос1: Есть ли способ привязать объект Channel, не загромождая весь конвейер? Вопрос2: Есть ли способ выбросить Flux.Error из фильтра?

Flux.just(system)
    .flatMap(getSystemData) 
    .zipWith(Mono.just(additionalData)) 
    .flatMap(getFluxData) 
    .zipWith(Mono.just(channel)) //RabbitMQ Channel
    .filter(tuple2 -> {
        MyObject myObject = tuple2.getT1();
        Channel channel1 = tuple2.getT2();
        try {
            List<MyData> dataList = dataRepository.findBySystem(myObject.getId());
            dismissMessage = dataList.stream().anyMatch(data -> myObject.equals(data));
        } catch (MongoTimeoutException mongoTimeoutException) {
            log.error("Timeout in mongo {}", mongoTimeoutException.getMessage());
            //return Flux.error((Throwable) mongoTimeoutException); It is not possible!!!
            channel.basicNack(tag, false, true);//I don't want to do it here, instead at onFailed
        } catch (MongoException mongoException) {
            log.error("Exception in mongo {}", mongoException.getMessage());
            //return Flux.error((Throwable) mongoTimeoutException); It is not possible!!!
            channel.basicNack(tag, false, true);//I don't want to do it here, instead at onFailed
        }
        if(dismissMessage){
            log.debug("Duplicate mesage received: {}", myObject);
        }
        return !dismissMessage;
    })
    .subscribe(this::onCompleted, this::onFailed);
...