Использование нескольких ResponseObserver.onNext застрянет в сервисе - gRPC Java - PullRequest
1 голос
/ 09 октября 2019

Я хочу отправить данные порциями по 50 с помощью gRPC, однако при многократном запуске responseObserver.onNext он просто зависает и не отправляет данные. Но когда я останавливаю поток, я получаю данные.

Это мой код:

// List<List<MyClass>> listOfMyClass
for (List<MyClass> myClasses : listOfMyClass) {
  responseObserver.onNext(buildReply(myClasses));
}
responseObserver.onCompleted();

Это застрянет. Но если я запускаю responseObserver.onNext(buildReply(myClasses)); только один раз, я получу данные мгновенно.

Мой прото:

message Request {
    string number = 1;
}

message Reply {
    repeated CustomMessage results = 1;
}

service Service {
    rpc MyRequest (Request) returns (stream Reply) {}
}

Я использую графический интерфейс с именем https://github.com/uw-labs/bloomrpc,который должен отображать поток легко.

1 Ответ

0 голосов
/ 23 октября 2019

Интерфейс StreamObserver не блокирует и не обеспечивает обратного давления для исходящих данных. Чтобы оказать клиенту обратное давление, вам нужно привести StreamObserver, который клиент использует для отправки данных в ServerCallStreamObserver, и наблюдать бит «готовности». Как то так:

ClientCallStreamObserver<MyClass> clientCallStreamResponseObserver =
      (ClientCallStreamObserver<File>) responseObserver;
clientCallStreamObserver.setOnReadyHandler(new Runnable() {
    public void run() {
      sendUntilNotReady(clientCallStreamResponseObserver);
    }
});
sendUntilNotReady(clientCallStreamResponseObserver);

...

private void sendUntilNotReady(ClientCallStreamObserver<MyClass> clientCallStreamResponseObserver) {
  while (clientCallStreamResponseObserver.isReady()) {
    clientCallStreamResponseObserver.onNext(... /* build reply for next item in listOfMyClass */);
  }
}
...