Я делаю реактивный / флюс / моно отдых. Backend - Oracle, и используется rxjava2-jdb c.
Как обойти эту ошибку блокировки?
Я изучаю rx на примере, поэтому было бы здорово узнать концептуальные детали, которые предотвращают манипулирование списком, которое кажется рутинным.
- Репозиторий возвращает поток из rx / базы данных:
">
Обработчик пытается добавить этот список / поток в другой объект Protobuf SearchResponse, но не удается.
Транс в коротком стеке:
causes error">
Полная трассировка стека:
2020-02-23T10:43:37,967 INFO [main] o.s.d.r.c.RepositoryConfigurationDelegate: Bootstrapping Spring Data JDBC repositories in DEFAULT mode.
2020-02-23T10:43:38,046 INFO [main] o.s.d.r.c.RepositoryConfigurationDelegate: Finished Spring Data repository scanning in 69ms. Found 0 JDBC repository interfaces.
2020-02-23T10:43:38,988 INFO [main] c.z.h.HikariDataSource: HikariPool-1 - Starting...
2020-02-23T10:43:39,334 INFO [main] c.z.h.HikariDataSource: HikariPool-1 - Start completed.
2020-02-23T10:43:40,196 INFO [main] o.s.b.w.e.n.NettyWebServer: Netty started on port(s): 8080
2020-02-23T10:43:40,199 INFO [main] o.s.b.StartupInfoLogger: Started App in 4.298 seconds (JVM running for 5.988)
2020-02-23T10:44:01,307 ERROR [reactor-http-nio-2] o.s.c.l.CompositeLog: [d806b05e] 500 Server Error for HTTP GET "/webflux/customers"
java.lang.IllegalStateException: Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-2
at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:160)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ HTTP GET "/webflux/customers" [ExceptionHandlingWebHandler]
Stack trace:
at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:160)
at com.google.protobuf.AbstractMessageLite$Builder.addAllCheckingNulls(AbstractMessageLite.java:372)
at com.google.protobuf.AbstractMessageLite$Builder.addAll(AbstractMessageLite.java:434)
at pn.api.protobuf.Proto$SearchResponse$Builder.addAllCustomers(Proto.java:3758)
at pn.api.controller.AppHandler.getAllCustomers(AppHandler.java:24)
at org.springframework.web.reactive.function.server.support.HandlerFunctionAdapter.handle(HandlerFunctionAdapter.java:61)
at org.springframework.web.reactive.DispatcherHandler.invokeHandler(DispatcherHandler.java:161)
at org.springframework.web.reactive.DispatcherHandler.lambda$handle$1(DispatcherHandler.java:146)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2199)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:137)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2007)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1881)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:171)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
at reactor.core.publisher.Mono.subscribe(Mono.java:4105)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:441)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:211)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:139)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:63)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4105)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
at reactor.netty.http.server.HttpServerHandle.onStateChange(HttpServerHandle.java:64)
at reactor.netty.tcp.TcpServerBind$ChildObserver.onStateChange(TcpServerBind.java:228)
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:465)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:167)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:830)
Репозиторий. java
public Flux<Proto.Customer> allCustomers() {//rxjava2 returns Flowable<> ... Flux<>
Flowable<Proto.Customer> customerFlowable =
db.select(queryAllCustomers).get(new CustomerResultSetMapper());
return Flux.from(customerFlowable);
}
AppHandler. java
public Mono<ServerResponse> getAllCustomers(ServerRequest request) {
Flux<Proto.Customer> customers = repository.allCustomers();
Proto.SearchResponse out = Proto.SearchResponse.newBuilder()
.addAllCustomers(customers.toIterable()).build();
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(out, Proto.SearchResponse.class);
}