Я использую реактивный моноклиент и у меня около 5 миллионов записей в коллекции.
Вот как реагирует код.
final CountDownLatch countDownload = new CountDownLatch(1);
testDao.getAll(collection)
.doOnError(e -> {
System.out.println("Received error here ....");
countDownload.countDown();
}).doOnComplete(() -> {
System.out.println("Completed here ....");
countDownload.countDown();
}).subscribe(doc -> {
System.out.println("Received document here ....");
});
});
countDownload.await();
метод getAll,
public Flux<Document> getNationalIDDetails(final String collection) {
return Flux.from(mongoClient.get(collection).find());
}
Это должно быть чтение и обработка всех 5 миллионов записей. Но, получая приведенную ниже ошибку после обработки около 10K записей.
"throwable":{
"class":"com.mongodb.MongoCursorNotFoundException",
"msg":"Query failed with error code -5 and error message 'Cursor 622104009044 not found on server test.com:10901' on server test.com:10901",
"stack":[
"com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:27)",
"com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:295)",
"com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:277)",
"com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)",
"com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:224)",
"com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)",
"com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:83)",
"com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:461)",
"com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111)",
"com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)",
"com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:397)",
"com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:372)",
"com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:667)",
"com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:634)",
"com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:510)",
"com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:507)",
"com.mongodb.connection.netty.NettyStream.readAsync(NettyStream.java:233)",
"com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:507)",
"com.mongodb.internal.connection.InternalStreamConnection.access$1000(InternalStreamConnection.java:74)",
"com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:624)",
"com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:609)",
"com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:510)",
"com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:507)",
"com.mongodb.connection.netty.NettyStream.readAsync(NettyStream.java:233)",
"com.mongodb.connection.netty.NettyStream.handleReadResponse(NettyStream.java:263)",
"com.mongodb.connection.netty.NettyStream.access$800(NettyStream.java:69)",
"com.mongodb.connection.netty.NettyStream$InboundBufferHandler.channelRead0(NettyStream.java:322)",
"com.mongodb.connection.netty.NettyStream$InboundBufferHandler.channelRead0(NettyStream.java:319)",
"io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)",
"io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)",
"io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)",
"io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)",
"io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1476)",
"io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1225)",
"io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1272)",
"io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)",
"io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)",
"io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)",
"io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)",
"io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)",
"io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)",
"io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)",
"io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)",
"io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)",
"io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)",
"io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)",
"io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)",
"io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)",
"io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)",
"io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)",
"io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)",
"io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)",
"java.lang.Thread.run(Thread.java:834)"
]
}
Не уверен, почему это происходит после обработки 10 тыс. Записей.