Насколько безопасно взаимодействовать с StreamObserver асинхронно, т.е. с Java 8+ CompletableFutures? - PullRequest
0 голосов
/ 05 мая 2020

Я смотрю на пример Simple RP C из grp c .io basi c tutorial :

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  responseObserver.onNext(checkFeature(request));
  responseObserver.onCompleted();
}

...

private Feature checkFeature(Point location) {
  for (Feature feature : features) {
    if (feature.getLocation().getLatitude() == location.getLatitude()
        && feature.getLocation().getLongitude() == location.getLongitude()) {
      return feature;
    }
  }

  // No feature was found, return an unnamed feature.
  return Feature.newBuilder().setName("").setLocation(location).build();
}

Есть ли какие-либо оговорки для взаимодействия с StreamObserver из других потоков? Например, скажем, checkFeature() асинхронно обращается к другой службе, возвращая CompletableFuture:

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  checkFeature(request).
      thenAccept(feature -> responseObserver.onNext(feature));
  responseObserver.onCompleted();
}

Конечно, вышеперечисленное не сработает, потому что первый поток будет выполнить onCompleted() перед возвратом функции. Итак, давайте исправим это:

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  checkFeature(request).
      thenAccept(feature -> {
        responseObserver.onNext(feature);
        responseObserver.onCompleted();
      });
}

Я думаю, это должно сработать, но я новичок в Java, поэтому мне интересно, какие есть разветвления. Например,

  • Будет ли Context.current() согласованным?
  • Будет ли что-либо вызывать деструкцию или преждевременное закрытие StreamObserver, кроме onNext() для унарных вызовов и onError()?
  • Есть ли лучшая практика?

Было бы здорово, если бы кто-нибудь также объяснил мне, как они рассуждают. Я попытался найти реальные реализации StreamObserver, но не знал, что искать.

Ответы [ 2 ]

1 голос
/ 06 мая 2020

Использование thenAccept() для вызова onNext() и onCompleted() нормально, потому что наблюдатель не вызывается одновременно из нескольких потоков.

«Неисправный» пример, который вызывал onCompleted() отдельно, также был сломан потому что он мог вызвать наблюдателя из нескольких потоков без какой-либо формы синхронизации. StreamObservers не может быть вызван из нескольких потоков одновременно.

Использование thenAccept() не совсем правильно, поскольку оно не обрабатывает случай, когда будущее не работает. Таким образом, вам также необходимо получить Throwable, что можно сделать с помощью whenComplete():

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  checkFeature(request).
      whenComplete((feature, t) -> {
        if (t != null) {
          responseObserver.onError(t);
        } else {
          responseObserver.onNext(feature);
          responseObserver.onCompleted();
        }
      });
}

Контекст может легко оказаться «неправильным» при обработке этой лямбды. Обычно мы ищем «архитектурные» решения, чтобы гарантировать распространение контекста, например, обертывание всех пулов потоков приложений в Context.currentContextExecutor() при их создании, чтобы не беспокоить отдельные сайты вызовов размножение. Я недостаточно знаком с CompletableFuture, чтобы разработать для него стратегию.

1 голос
/ 05 мая 2020

Будет ли Context.current () согласованным?

Context.current() использует ThreadLocal. если вы обращаетесь к нему в другом потоке, он не будет согласованным. Вы можете распространять контекст между потоками. Вы можете найти этот пост полезным.

Будет ли что-либо вызывать преждевременное уничтожение или закрытие StreamObserver, кроме onNext () для унарных вызовов и onError ()?

Да, нормальный поток StreamObserver заканчивается onError или onCompleted.

As StreamObserver javado c утверждает: «Поскольку отдельные StreamObserver не являются потокобезопасными, если их несколько потоки будут писать в StreamObserver одновременно, приложение должно синхронизировать вызовы ». Если вы вызываете StreamObserver одновременно , вам необходимо синхронизировать вызовы. Другими словами, если вы точно знаете, что он не будет вызываться одновременно, даже если вы используете несколько потоков, все должно быть в порядке. если только производительность не критична, поскольку она подвержена ошибкам. По крайней мере, он заслуживает хорошего комментария.

...