Монго реактивный клиент - com.mongodb.MongoCursorNotFoundException - PullRequest
0 голосов
/ 04 июня 2019

Я использую реактивный моноклиент и у меня около 5 миллионов записей в коллекции.

Вот как реагирует код.

final CountDownLatch countDownload = new CountDownLatch(1);
testDao.getAll(collection)
                    .doOnError(e -> {

                        System.out.println("Received error here ....");
                        countDownload.countDown();
                    }).doOnComplete(() -> {

                         System.out.println("Completed here ....");
                        countDownload.countDown();
                    }).subscribe(doc -> {
                        System.out.println("Received document here ....");
                    });
        });

countDownload.await();

метод getAll,

public Flux<Document> getNationalIDDetails(final String collection) {
        return Flux.from(mongoClient.get(collection).find());
    }

Это должно быть чтение и обработка всех 5 миллионов записей. Но, получая приведенную ниже ошибку после обработки около 10K записей.

"throwable":{  
      "class":"com.mongodb.MongoCursorNotFoundException",
      "msg":"Query failed with error code -5 and error message 'Cursor 622104009044 not found on server test.com:10901' on server test.com:10901",
      "stack":[  
         "com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:27)",
         "com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:295)",
         "com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:277)",
         "com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)",
         "com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:224)",
         "com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)",
         "com.mongodb.internal.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:83)",
         "com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:461)",
         "com.mongodb.internal.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:111)",
         "com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)",
         "com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:397)",
         "com.mongodb.internal.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:372)",
         "com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:667)",
         "com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:634)",
         "com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:510)",
         "com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:507)",
         "com.mongodb.connection.netty.NettyStream.readAsync(NettyStream.java:233)",
         "com.mongodb.internal.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:507)",
         "com.mongodb.internal.connection.InternalStreamConnection.access$1000(InternalStreamConnection.java:74)",
         "com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:624)",
         "com.mongodb.internal.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:609)",
         "com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:510)",
         "com.mongodb.internal.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:507)",
         "com.mongodb.connection.netty.NettyStream.readAsync(NettyStream.java:233)",
         "com.mongodb.connection.netty.NettyStream.handleReadResponse(NettyStream.java:263)",
         "com.mongodb.connection.netty.NettyStream.access$800(NettyStream.java:69)",
         "com.mongodb.connection.netty.NettyStream$InboundBufferHandler.channelRead0(NettyStream.java:322)",
         "com.mongodb.connection.netty.NettyStream$InboundBufferHandler.channelRead0(NettyStream.java:319)",
         "io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)",
         "io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)",
         "io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)",
         "io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)",
         "io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1476)",
         "io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1225)",
         "io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1272)",
         "io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)",
         "io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)",
         "io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)",
         "io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)",
         "io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)",
         "io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)",
         "io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)",
         "io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)",
         "io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)",
         "io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)",
         "io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)",
         "io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)",
         "io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)",
         "io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)",
         "io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)",
         "io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)",
         "io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)",
         "java.lang.Thread.run(Thread.java:834)"
      ]
   }

Не уверен, почему это происходит после обработки 10 тыс. Записей.

...