Я получаю вышеуказанную ошибку, когда я агрегирую в mon go коллекцию с клиентом vertx mon go, может кто-нибудь помочь мне в чем здесь проблема
at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:31)
at com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:294)
at com.mongodb.operation.AsyncQueryBatchCursor$CommandResultSingleResultCallback.onResult(AsyncQueryBatchCursor.java:276)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor$2.onResult(DefaultServer.java:205)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
at com.mongodb.connection.CommandProtocolImpl$1.onResult(CommandProtocolImpl.java:100)
at com.mongodb.connection.DefaultConnectionPool$PooledConnection$1.onResult(DefaultConnectionPool.java:458)
at com.mongodb.connection.UsageTrackingInternalConnection$2.onResult(UsageTrackingInternalConnection.java:110)
at com.mongodb.internal.async.ErrorHandlingResultCallback.onResult(ErrorHandlingResultCallback.java:49)
at com.mongodb.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:383)
at com.mongodb.connection.InternalStreamConnection$2$1.onResult(InternalStreamConnection.java:359)
at com.mongodb.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:651)
at com.mongodb.connection.InternalStreamConnection$MessageHeaderCallback$MessageCallback.onResult(InternalStreamConnection.java:618)
at com.mongodb.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:487)
at com.mongodb.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:484)
at com.mongodb.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:233)
at com.mongodb.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:216)
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
at java.base/sun.nio.ch.Invoker.invokeDirect(Invoker.java:158)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.implRead(UnixAsynchronousSocketChannelImpl.java:562)
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:277)
at java.base/sun.nio.ch.AsynchronousSocketChannelImpl.read(AsynchronousSocketChannelImpl.java:298)
at com.mongodb.connection.AsynchronousSocketChannelStream.readAsync(AsynchronousSocketChannelStream.java:128)
at com.mongodb.connection.InternalStreamConnection.readAsync(InternalStreamConnection.java:484)
at com.mongodb.connection.InternalStreamConnection.access$1100(InternalStreamConnection.java:74)
at com.mongodb.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:608)
at com.mongodb.connection.InternalStreamConnection$MessageHeaderCallback.onResult(InternalStreamConnection.java:593)
at com.mongodb.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:487)
at com.mongodb.connection.InternalStreamConnection$5.completed(InternalStreamConnection.java:484)
at com.mongodb.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:233)
at com.mongodb.connection.AsynchronousSocketChannelStream$BasicCompletionHandler.completed(AsynchronousSocketChannelStream.java:216)
at java.base/sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:127)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finishRead(UnixAsynchronousSocketChannelImpl.java:439)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.finish(UnixAsynchronousSocketChannelImpl.java:191)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:213)
at java.base/sun.nio.ch.KQueuePort$EventHandlerTask.run(KQueuePort.java:312)
at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Вот мой код:
JsonObject pricing_config = new JsonObject()
.put("host", config().getString("pricing_host", ""))
.put("port", Integer.parseInt(config().getString("pricing_port", "0")))
.put("username", config().getString("pricing_username", ""))
.put("password", config().getString("pricing_password", ""))
.put("db_name", config().getString("pricing_db_name", ""))
//.put("authSource", config().getString("pricing_authSource", ""))
.put(MongoConstants.USE_OBJECT_ID_KEY, true);
MongoClient mongoClient = MongoClient.createNonShared(this.vertx, pricing_config);
AggregateOptions options = new AggregateOptions()
//.setMaxTime(20000)
//.setMaxAwaitTime(20000)
.setBatchSize(20)
.setAllowDiskUse(true);
return this.mongoClient.aggregateWithOptions(aggregationCollection, pipeline, options)
.toFlowable()
.map(res -> res.getString("_id"))
.toList();
Я пытался с различными опциями с AggregateOptions, но это не сработало, проблема здесь в том, что он не может получить последующие пакеты, так как курсор получил тайм-аут после выборки первого пакета, если я увеличу его до Integer.MAX он будет работать только для 16 МБ данных в одном пакете, в противном случае он снова потерпит неудачу.