Я смотрю на пример Simple RP C из grp c .io basi c tutorial :
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
responseObserver.onNext(checkFeature(request));
responseObserver.onCompleted();
}
...
private Feature checkFeature(Point location) {
for (Feature feature : features) {
if (feature.getLocation().getLatitude() == location.getLatitude()
&& feature.getLocation().getLongitude() == location.getLongitude()) {
return feature;
}
}
// No feature was found, return an unnamed feature.
return Feature.newBuilder().setName("").setLocation(location).build();
}
Есть ли какие-либо оговорки для взаимодействия с StreamObserver из других потоков? Например, скажем, checkFeature()
асинхронно обращается к другой службе, возвращая CompletableFuture:
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
checkFeature(request).
thenAccept(feature -> responseObserver.onNext(feature));
responseObserver.onCompleted();
}
Конечно, вышеперечисленное не сработает, потому что первый поток будет выполнить onCompleted()
перед возвратом функции. Итак, давайте исправим это:
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
checkFeature(request).
thenAccept(feature -> {
responseObserver.onNext(feature);
responseObserver.onCompleted();
});
}
Я думаю, это должно сработать, но я новичок в Java, поэтому мне интересно, какие есть разветвления. Например,
- Будет ли
Context.current()
согласованным? - Будет ли что-либо вызывать деструкцию или преждевременное закрытие StreamObserver, кроме
onNext()
для унарных вызовов и onError()
? - Есть ли лучшая практика?
Было бы здорово, если бы кто-нибудь также объяснил мне, как они рассуждают. Я попытался найти реальные реализации StreamObserver
, но не знал, что искать.