Две стратегии onErrorContinue, вызываемые при слиянии с - PullRequest
0 голосов
/ 10 января 2020

У меня есть цепочка с двумя операторами onErrorContinue, что-то вроде этого:

Flux.from(publisher).transform(apply(pub)).onErrorContinue((e, o) -> callback);

public Publisher<A> apply(Publisher<A> pub) {
    return pub -> pub.doOnNext(throwException()).onErrorContinue((e, o) -> callback);
}

И когда я получаю сообщение об ошибке, выполняются оба обратных вызова. Чтобы избежать этого:

    return publisher -> from(publisher)

        // This replaces the onErrorContinue key if it exists, to prevent it from being propagated.
        .compose(pub -> pub.subscriberContext(context -> {

          Optional<Object> onErrorStrategy = context.getOrEmpty(KEY_ON_NEXT_ERROR_STRATEGY);

          if (onErrorStrategy.isPresent()
              && onErrorStrategy.get().toString().contains(ON_NEXT_FAILURE_STRATEGY)) {

            BiFunction<Throwable, Object, Throwable> onErrorContinue = (e, o) -> null;
            return context.put(KEY_ON_NEXT_ERROR_STRATEGY, onErrorContinue);
          }
          return context;
        }));
    }

Есть ли лучший способ сделать это?

Здесь есть контрольный пример для его воспроизведения:

@Test
public void manuelitaViviaEnPehuajo() {

    final AtomicReference<FluxSink<String>> sinkRef = new AtomicReference<>();
    final AtomicReference<FluxSink<String>> other = new AtomicReference<>();

    Flux.<String>create(sink -> sinkRef.set(sink)).transform(pub -> apply(pub, other)).mergeWith(Flux.create(other::set)).onErrorContinue((e, o) -> System.out.println("outside callback")).subscribe();

    sinkRef.get().next("Hello!");
}

private Publisher<String> apply(Publisher<String> pub, AtomicReference<FluxSink<String>> other) {
    return Flux.from(pub).doOnNext(o -> {
        throw Exceptions.propagate(new MyException());
    }).onErrorContinue((e, o) -> {
        System.out.println("inside callback");
        other.get().error(e);
    });
}

private class MyException extends Exception {}
...