У меня есть цепочка с двумя операторами onErrorContinue, что-то вроде этого:
Flux.from(publisher).transform(apply(pub)).onErrorContinue((e, o) -> callback);
public Publisher<A> apply(Publisher<A> pub) {
return pub -> pub.doOnNext(throwException()).onErrorContinue((e, o) -> callback);
}
И когда я получаю сообщение об ошибке, выполняются оба обратных вызова. Чтобы избежать этого:
return publisher -> from(publisher)
// This replaces the onErrorContinue key if it exists, to prevent it from being propagated.
.compose(pub -> pub.subscriberContext(context -> {
Optional<Object> onErrorStrategy = context.getOrEmpty(KEY_ON_NEXT_ERROR_STRATEGY);
if (onErrorStrategy.isPresent()
&& onErrorStrategy.get().toString().contains(ON_NEXT_FAILURE_STRATEGY)) {
BiFunction<Throwable, Object, Throwable> onErrorContinue = (e, o) -> null;
return context.put(KEY_ON_NEXT_ERROR_STRATEGY, onErrorContinue);
}
return context;
}));
}
Есть ли лучший способ сделать это?
Здесь есть контрольный пример для его воспроизведения:
@Test
public void manuelitaViviaEnPehuajo() {
final AtomicReference<FluxSink<String>> sinkRef = new AtomicReference<>();
final AtomicReference<FluxSink<String>> other = new AtomicReference<>();
Flux.<String>create(sink -> sinkRef.set(sink)).transform(pub -> apply(pub, other)).mergeWith(Flux.create(other::set)).onErrorContinue((e, o) -> System.out.println("outside callback")).subscribe();
sinkRef.get().next("Hello!");
}
private Publisher<String> apply(Publisher<String> pub, AtomicReference<FluxSink<String>> other) {
return Flux.from(pub).doOnNext(o -> {
throw Exceptions.propagate(new MyException());
}).onErrorContinue((e, o) -> {
System.out.println("inside callback");
other.get().error(e);
});
}
private class MyException extends Exception {}