Я хочу использовать gRPC, чтобы позволить клиентам подписываться на события, генерируемые сервером.У меня RPC объявлен так:
rpc Subscribe (SubscribeRequest) returns (stream SubscribeResponse);
, где возвращаемый поток бесконечен.Чтобы «отписаться», клиенты отменяют RPC (кстати, есть ли более чистый способ?).
Я выяснил, как клиент может отменить вызов:
Context.CancellableContext cancellableContext =
Context.current().withCancellation();
cancellableContext.run(() -> {
stub.subscribe(request, callback);
});
// do other stuff / wait for reason to unsubscribe
cancellableContext.cancel(new InterruptedException());
Однако,Сервер, похоже, не замечает, что клиент отменил свой вызов.Я тестирую это с использованием фиктивного сервера:
@Override
public void subscribe(SubscribeRequest request,
StreamObserver<SubscribeResponse> responseObserver) {
// in real code, this will happen in a separate thread.
while (!Thread.interrupted()) {
responseObserver.onNext(SubscribeResponse.getDefaultInstance());
}
}
Сервер с радостью продолжит отправлять свои сообщения в эфир.Как сервер может распознать, что вызов был отменен клиентом, и таким образом прекратить отправку ответов?