Каковы лучшие практики потоковой передачи большого количества данных с GRP C? Я отправляю запрос на сервер GRP C, который будет передавать данные обратно. Данные, отправляемые обратно, могут содержать порядка 100 протобуф-сообщений или пару 100000 протобуф-сообщений.
service CrossEngineSelector {
rpc QueryDB (QueryRequest) returns (stream QueryResponse) {}
}
Сервер представляет собой простую реализацию, которая отправляет сообщения protobuf. * 1004 *
@Override
public void queryBD(QueryRequest request, StreamObserver<QueryResponse> responseObserver) {
Iterables.partition( dataLoader.getData(), 1000).forEach(batch -> {
responseObserver.onNext(QueryResponse.newBuilder().addAllRows(batch).build());
});
responseObserver.onCompleted();
}
На стороне клиента используется blockingStub, который вызывает этот метод (сгенерированный код protobuf):
public Iterator<QueryResponse> queryDB(QueryRequest request) {
return ClientCalls.blockingServerStreamingCall(this.getChannel(),
CrossEngineSelectorGrpc.getQueryEnginesMethod(),
this.getCallOptions(), request);
}
Как только клиент вызывает этот метод, я просто перебираю QueryResponse.
Все это прекрасно работает для потоков, которые отправляют только небольшое количество сообщений. Когда я пытаюсь передать 100 000 сообщений, максимальный размер входящего сообщения продолжает увеличиваться, и я получаю сообщение об ошибке: RESOURCE_EXHAUSTED: Compressed gRPC message exceeds maximum size 4194304: 4196022 bytes read
Мое текущее исправление для этого состоит в установке максимального размера входящего сообщения. очень высокий + 1 Гб. Это жестко закодированное значение, поэтому оно не масштабируется. Клиент не знает, сколько сообщений вернет сервер. Я мог бы столкнуться с вариантами использования, даже если 1-мегабайтный максимальный размер входящего сообщения будет недостаточным.
Я надеюсь, что я делаю ошибку реализации. Я надеюсь, что есть способ сброса размера сообщения для каждого потока (onNext ()) с сервера, или это нормально, он продолжает увеличивать размер сообщения?
Я бы предположил, что один responseObserver.onNext(QueryResponse.newBuilder().addAllRows(batch).build());
отправляет пару мегабайт, и это будет считаться размером сообщения, а не всего потока, пока он выполняется.
Я использую Micronaut как для сервера, так и для клиента.