Я делаю краткую версию того, что показано здесь: (https://vertx.io/docs/vertx-amqp-client/java/#_creating_a_receiver).
По сути, приведенный ниже код получает сообщения всякий раз, когда они принимаются по соединению. За кадром я симулирую разорванное соединение, останавливая службу брокера сообщений. В результате код AMQP выбрасывает NPE. Я хотел бы поймать NPE и отправить лучшую ошибку подписчику.
Когда происходит NPE, кажется, я не могу уловить ошибку, поэтому я не уверен, как это сделать sh.
public Publisher<Object> receiveAmqpMessages(AmqpConnection connection, String address) {
return connection
.rxCreateReceiver(address)
.flatMapPublisher(receiver ->
receiver.toObservable()
.doFinally(receiver::rxClose)
.onErrorReturn(e-> {
System.out.println("I never see this message");
return null;
}.toFlowable(BackpressureStrategy.BUFFER)
);
}
Я попытался обернуть его в try / catch, как это так, но он не go в блок catch
public Publisher<Object> receiveAmqpMessages(AmqpConnection connection, String address) {
try {
// same code shown above
} catch (NullPointerException e) {
System.out.println("I never see this message either");
return Observable.error(new Exception("foo")).toFlowable(BackpressureStrategy.BUFFER);
}
}
FWIW консоль выглядит так (Примечание: консоль никогда не выводит ничего, что ссылается на мой собственный файл класса):
SEVERE: Unhandled Exception
java.lang.NullPointerException
at io.vertx.ampq.impl.AmqpConnectionImpl.lambda$createReceiver$8(Line 251)
...
Если вы откроете их AmqpConnectionImpl и go для этой строки кода, вы увидите
ProtonReceiver receiver = connection.get().createReceiver(address,opts)
, который вызывает NPE, так как connection.get () имеет значение null