У меня проблема с обработкой ошибок в реакторе. Мое весеннее загрузочное приложение, слушаю сообщения RabbitMQ и после получения сообщения сохраняю его в MongoDB. Мой вариант использования подходит для конвейера потока и есть фильтр.
как в «фильтре», так и в «подписке», я соединяюсь с пн go. Во время исключения MongoTimeOut, главным образом по истечении времени ожидания соединения, я хочу отправить сообщение «NACK». Это требует использования объекта Channel.
Вопрос1: Есть ли способ привязать объект Channel, не загромождая весь конвейер? Вопрос2: Есть ли способ выбросить Flux.Error из фильтра?
Flux.just(system)
.flatMap(getSystemData)
.zipWith(Mono.just(additionalData))
.flatMap(getFluxData)
.zipWith(Mono.just(channel)) //RabbitMQ Channel
.filter(tuple2 -> {
MyObject myObject = tuple2.getT1();
Channel channel1 = tuple2.getT2();
try {
List<MyData> dataList = dataRepository.findBySystem(myObject.getId());
dismissMessage = dataList.stream().anyMatch(data -> myObject.equals(data));
} catch (MongoTimeoutException mongoTimeoutException) {
log.error("Timeout in mongo {}", mongoTimeoutException.getMessage());
//return Flux.error((Throwable) mongoTimeoutException); It is not possible!!!
channel.basicNack(tag, false, true);//I don't want to do it here, instead at onFailed
} catch (MongoException mongoException) {
log.error("Exception in mongo {}", mongoException.getMessage());
//return Flux.error((Throwable) mongoTimeoutException); It is not possible!!!
channel.basicNack(tag, false, true);//I don't want to do it here, instead at onFailed
}
if(dismissMessage){
log.debug("Duplicate mesage received: {}", myObject);
}
return !dismissMessage;
})
.subscribe(this::onCompleted, this::onFailed);