grp c - java: правильная обработка повторов на клиенте для потокового вызова службы - PullRequest
0 голосов
/ 19 марта 2020

Я пытаюсь настроить простой шаблон публикации / подписки через grp c, используя потоковую передачу службы вместе с заглушкой asyn c на клиенте. После реализации части потоковой передачи сообщений обратно клиенту я хотел обработать сценарий ios для разрыва соединения. Прямо сейчас я реализую часть, когда служба, например, закрыта, и клиент должен «восстановиться» после этой потери соединения.

Я прочитал и искал механизм повтора в google / github / so и, наконец, установил политика повторных попыток для метода в сервисе, который передает сообщения. Насколько я понял, механизм повтора должен работать, когда служба возвращает некоторые из retryableStatusCodes, определенных в политике повтора. После введения политики повторных попыток на клиенте я хотел протестировать ее, и результаты двух следующих сценариев ios смущают меня при попытке повторения.

Первый сценарий:

  • connect процедура вызывается (через ~ n секунд преднамеренно сообщения не передаются обратно клиенту)
  • служба закрыта
  • onError не вызывается на клиенте
  • служба снова подключена
  • подключиться достигнуто снова достигнуто с повторной попыткой

Второй сценарий:

  • connect вызывается процедура (через ~ n секунд первое сообщение приходит, сообщение обрабатывается в обработчике onNext на клиенте)
  • служба закрыта
  • onError is вызвано на клиенте
  • сервис снова работает
  • подключен не достигнут снова с повторной попыткой

В общем, что меня смущает, так это почему разница в поведении между этими двумя сценариями ios? Почему в первом сценарии обнаружено, что сервер вернул UNAVAILABLE, и попытка повторной попытки предпринята, но во втором случае даже при том же состоянии повторная попытка не работает?

Вот код для вызова connect on клиент, метод connect для службы и настройка политики повторных попыток на клиенте

client:

messageStub.withWaitForReady().connect(messagesRequest, new StreamObserver<>() {
    @Override
    public void onNext(MessageResponse messageResponse) {
        //process new message
        MessageDto message = new MessageDto();
        message.setBody(messageResponse.getBody());
        message.setTitle(messageResponse.getTitle());

        messageService.broadcastMessage(message);
    }

    @Override
    public void onError(Throwable throwable) {
        //service went down
        LOGGER.error(throwable.getStackTrace());
    }

    @Override
    public void onCompleted() {
        //This method should be called when user logs out of the application
        LOGGER.info(String.format("Message streaming terminated for user %d", userId));
    }
});
service:

@Override
public void connect(MessageRequest request, StreamObserver<MessageResponse> responseObserver) {
    Long userId = request.getUserId();

    ServerCallStreamObserver<MessageResponse > serverCallStreamObserver =
        (ServerCallStreamObserver<MessageResponse >) responseObserver;
    serverCallStreamObserver.setOnCancelHandler(getOnCancelHandler(userId));
    registerClient(userId, serverCallStreamObserver);
    //responseObserver.onCompleted() is left out so connection is not terminated
}


@EventListener
public void listenForMessages(MessageEvent messageEvent) {
    //omitted code (just some data retrieving - populate conn and message vars)....

    MessageResponse.Builder builder = MessageResponse.newBuilder();
    StreamObserver<MessageResponse> observer = conn.getResponseObserver();
    builder.setType(message.getType());
    builder.setTitle(message.getTitle());
    builder.setBody(message.getBody());

    observer.onNext(builder.build())
}

retryPolicy:

{
  "methodConfig" : [
    {
      "name": [
        {
          "service": "ch.example.proto.MessageService",
          "method": "connect"
        }
      ],
      "retryPolicy": {
        "maxAttempts": 10,
        "initialBackoff": "5s",
        "maxBackoff": "30s",
        "backoffMultiplier": 2,
        "retryableStatusCodes": ["UNAVAILABLE"]
      }
    }
  ]
}

1 Ответ

0 голосов
/ 31 марта 2020

Проблема в том, что при получении сообщения фиксирует RP C. Это обсуждается в gRF C A6 Client Retries . В нем упоминается Response-Headers, которые неявно отправляются, когда сервер отвечает первым сообщением.

По существу, после того, как gRP C передал данные обратно клиенту, автоматически повторить попытку не удастся. Если gRP C повторил попытку, как он должен объединить новый поток с тем, на что он уже ответил? Должен ли он пропустить первые N ответов? Но что, если ответы теперь другие? Проблема еще хуже для метаданных (доставленных через Response-Headers), так как они не могут быть предоставлены клиенту во второй раз.

gRP C может воспроизвести запросы клиента для несколько бэкэндов, но как только он начнет получать ответ от бэкэнда, он станет «фиксированным» к этому бэкэнду и не сможет изменить свое решение.

Вам потребуется повторная попытка на уровне приложения, чтобы восстановить sh поток. Когда клиент восстанавливает поток, ему может потребоваться изменить запрос, чтобы сообщить серверу, какие сообщения клиент уже получил.

...