Reactor, как отладить OverflowException? - PullRequest
0 голосов
/ 09 октября 2018

Я пытаюсь найти способ понять / отладить, почему у меня случайно есть эта трассировка стека:

reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.emit(FluxBufferPredicate.java:292)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:251)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:205)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:180)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:201)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:271)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:803)
at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:232)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:190)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1444)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1318)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61)
at reactor.core.publisher.Flux.subscribe(Flux.java:6873)

Означает ли это, что производитель быстрее, чем потребитель?Мой шаблон, вероятно, не является стандартным и выглядит следующим образом (здесь упрощенно):

Flux<Pair<Person, String>> auto = getPersons() // REST GET endpoint
        .map(p -> {
            // In my real-life example, the operation done here is quiet expensive.
            Person newP = new Person(p.name, p.age + 10);
            return new Pair<>(newP, "The new age of " + newP.name + " is now " + newP.age);
        })
        .publish()
        .autoConnect(2);

    Flux<Person> personsToSave = auto.map(e -> e.first);
    Flux<String> auditToSave = auto.map(e -> e.second);

    Mono.when(
            savePersons(personsToSave), // REST POST endpoint
            saveAudit(auditToSave))     // REST POST endpoint
        .doOnError(e -> System.err.println(e.getMessage()))
        .block();

Hooks.onOperatorDebug () или log () мне не очень помогают.У меня нет проблемы, если я удалю publish () и сохраню только людей ИЛИ аудит.

Может кто-нибудь дать мне, как провести более точное расследование?(или идея для решения проблемы) Reactor 3.1.6

...