Как правильно оформить шаблон публикации-подписки в grpc? - PullRequest
0 голосов
/ 01 марта 2019

Я пытаюсь реализовать шаблон sub sub, используя grpc, но я немного путаюсь с тем, как это сделать правильно.

мой прото: rpc call (google.protobuf.Empty) returns (stream Data);

клиент:

asynStub.call(Empty.getDefaultInstance(), new StreamObserver<Data>() {
         @Override
         public void onNext(Data value) {
           // process a data

         @Override
         public void onError(Throwable t) {

         }

         @Override
         public void onCompleted() {

         }
       });

   } catch (StatusRuntimeException e) {
     LOG.warn("RPC failed: {}", e.getStatus());
   }

   Thread.currentThread().join();

служба сервера:

public class Sender extends DataServiceGrpc.DataServiceImplBase implements Runnable {
  private final BlockingQueue<Data> queue;
  private final static HashSet<StreamObserver<Data>> observers = new LinkedHashSet<>();

  public Sender(BlockingQueue<Data> queue) {
    this.queue = queue;
  }

  @Override
  public void data(Empty request, StreamObserver<Data> responseObserver) {
    observers.add(responseObserver);
  }

  @Override
  public void run() {
    while (!Thread.currentThread().isInterrupted()) {
      try {
        // waiting for first element
        Data data = queue.take();
        // send head element
        observers.forEach(o -> o.onNext(data));

      } catch (InterruptedException e) {
        LOG.error("error: ", e);
        Thread.currentThread().interrupt();
      }
    }
  }
}

Как правильно удалить клиентов из глобальных наблюдателей?Как получить какой-то сигнал при обрыве соединения?
Как управлять переподключениями клиент-сервер?Как заставить клиента переподключиться, когда соединение обрывается?

Заранее спасибо!

1 Ответ

0 голосов
/ 06 марта 2019

При реализации вашего сервиса:

  @Override
  public void data(Empty request, StreamObserver<Data> responseObserver) {
    observers.add(responseObserver);
  }

Вам необходимо получить Контекст текущего запроса и прослушать для отмены .Для вызовов с одним запросом и несколькими ответами (так называемая потоковая передача на сервере) сгенерированный код gRPC упрощается для прямой передачи запроса.Это означает, что у вас нет прямого доступа к базовому ServerCall.Listener, как вы обычно слушаете клиентов, отключающихся и отменяющих.

Вместо этого каждому вызову gRPC соответствует Context, связанный с ним,который несет отмену и другие сигналы в пределах запроса.В вашем случае вам просто нужно прослушать отмену, добавив свой собственный слушатель, который затем безопасно удалит наблюдателя ответа из вашего связанного хеш-набора.


Что касается переподключений: клиенты gRPC будут автоматически переподключаться, если соединение разорвано, но обычно не будут повторять RPC, если это не безопасно.В случае потоковых RPC на сервере это обычно небезопасно, поэтому вам нужно будет повторить попытку RPC на своем клиенте напрямую.

...