Я пытаюсь найти способ понять / отладить, почему у меня случайно есть эта трассировка стека:
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.emit(FluxBufferPredicate.java:292)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:251)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:205)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:180)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:201)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:271)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:803)
at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:232)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:190)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1444)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1318)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61)
at reactor.core.publisher.Flux.subscribe(Flux.java:6873)
Означает ли это, что производитель быстрее, чем потребитель?Мой шаблон, вероятно, не является стандартным и выглядит следующим образом (здесь упрощенно):
Flux<Pair<Person, String>> auto = getPersons() // REST GET endpoint
.map(p -> {
// In my real-life example, the operation done here is quiet expensive.
Person newP = new Person(p.name, p.age + 10);
return new Pair<>(newP, "The new age of " + newP.name + " is now " + newP.age);
})
.publish()
.autoConnect(2);
Flux<Person> personsToSave = auto.map(e -> e.first);
Flux<String> auditToSave = auto.map(e -> e.second);
Mono.when(
savePersons(personsToSave), // REST POST endpoint
saveAudit(auditToSave)) // REST POST endpoint
.doOnError(e -> System.err.println(e.getMessage()))
.block();
Hooks.onOperatorDebug () или log () мне не очень помогают.У меня нет проблемы, если я удалю publish () и сохраню только людей ИЛИ аудит.
Может кто-нибудь дать мне, как провести более точное расследование?(или идея для решения проблемы) Reactor 3.1.6