Получение исключения при выполнении block () для объекта Mono, которое я получил от объекта ReactiveMongoRepository - PullRequest
1 голос
/ 11 октября 2019

У меня есть служба, которая передает данные во вторую службу, которая получает поток объектов и сохраняет их в моем MongoDB. внутри моей функции подписки на объект Flux, который я получаю от службы потоковой передачи, я использую метод save из интерфейса ReactiveMongoRepository. когда я пытаюсь использовать функцию блокировки и получать данные, я получаю следующую ошибку:

2019-10-11 13:30:38.559  INFO 19584 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1, serverValue:25}] to localhost:27017
2019-10-11 13:30:38.566  INFO 19584 --- [localhost:27017] org.mongodb.driver.cluster               : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 0, 1]}, minWireVersion=0, maxWireVersion=7, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=6218300}
2019-10-11 13:30:39.158  INFO 19584 --- [ctor-http-nio-4] quote-monitor-service                    : onNext(Quote(id=null, ticker=AAPL, price=164.8, instant=2019-10-11T10:30:38.800Z))
2019-10-11 13:30:39.411  INFO 19584 --- [ctor-http-nio-4] quote-monitor-service                    : cancel()
2019-10-11 13:30:39.429  INFO 19584 --- [ntLoopGroup-2-2] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:26}] to localhost:27017
2019-10-11 13:30:39.437  WARN 19584 --- [ctor-http-nio-4] io.netty.util.ReferenceCountUtil         : Failed to release a message: DefaultHttpContent(data: PooledSlicedByteBuf(freed), decoderResult: success)

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
    at 
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
    at reactor.core.publisher.Mono.block(Mono.java:1494) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
    at

мой код:

stockQuoteClient.getQuoteStream()
                .log("quote-monitor-service")
                .subscribe(quote -> {
                    Mono<Quote> savedQuote = quoteRepository.save(quote);
                    System.out.println("I saved a quote! Id: " +savedQuote.block().getId());
                });

после некоторого копания мне удается заставить его работатьно я не понимаю, почему это работает сейчас. новый код:

stockQuoteClient.getQuoteStream()
                .log("quote-monitor-service")
                .subscribe(quote -> {
                       Mono<Quote> savedQuote = quoteRepository.insert(quote);
                       savedQuote.subscribe(result ->
                                 System.out.println("I saved a quote! Id :: " + result.getId()));
    });

определение block (): подписаться на этот Mono и блокировать на неопределенный срок до получения следующего сигнала.

определение subscribe (): подписаться на этоMono и запрос неограниченного спроса.

Может кто-нибудь помочь мне понять, почему блок не работал и подписка работала? что мне здесь не хватает?

1 Ответ

1 голос
/ 11 октября 2019

Блокировка плохая, поскольку она связывает поток, ожидающий ответа. Это очень плохо в реактивном фреймворке, который имеет в своем распоряжении несколько потоков и спроектирован так, что нет из них должны быть излишне заблокированы.

Это оченьТо, что реактивные структуры призваны избегать, поэтому в этом случае это просто останавливает вас:

block () / blockFirst () / blockLast () блокируют, что не поддерживается в реакторе потока-http-nio-4

Ваш новый код, напротив, работает асинхронно. Поток не блокируется, так как на самом деле ничего не происходит до тех пор, пока хранилище не вернет значение (и тогда лямбда, переданная вами в savedQuote.subscribe(), будет выполнена, распечатав результат на консоль.)

Однако,новый код все еще не является оптимальным / нормальным с точки зрения реактивных потоков, поскольку вы выполняете всю свою логику в своем методе подписки. Обычно мы выполняем серию вызовов flatMap / map для преобразования элементов в потоке и используем doOnNext() для побочных эффектов (таких как вывод значения):

stockQuoteClient.getQuoteStream()
            .log("quote-monitor-service")
            .flatMap(quoteRepository::insert)
            .doOnNext(result -> System.out.println("I saved a quote! Id :: " + result.getId())))
            .subscribe();

ЕслиВы выполняете серьезную работу с реакторными / реактивными потоками, в общем, стоило бы их прочитать. Они очень эффективны для неблокирующей работы, но для них требуется иной способ мышления (и кодирования), чем для более «стандартной» Java.

...