Я пытаюсь настроить простой шаблон публикации / подписки через grp c, используя потоковую передачу службы вместе с заглушкой asyn c на клиенте. После реализации части потоковой передачи сообщений обратно клиенту я хотел обработать сценарий ios для разрыва соединения. Прямо сейчас я реализую часть, когда служба, например, закрыта, и клиент должен «восстановиться» после этой потери соединения.
Я прочитал и искал механизм повтора в google / github / so и, наконец, установил политика повторных попыток для метода в сервисе, который передает сообщения. Насколько я понял, механизм повтора должен работать, когда служба возвращает некоторые из retryableStatusCodes, определенных в политике повтора. После введения политики повторных попыток на клиенте я хотел протестировать ее, и результаты двух следующих сценариев ios смущают меня при попытке повторения.
Первый сценарий:
- connect процедура вызывается (через ~ n секунд преднамеренно сообщения не передаются обратно клиенту)
- служба закрыта
- onError не вызывается на клиенте
- служба снова подключена
- подключиться достигнуто снова достигнуто с повторной попыткой
Второй сценарий:
- connect вызывается процедура (через ~ n секунд первое сообщение приходит, сообщение обрабатывается в обработчике onNext на клиенте)
- служба закрыта
- onError is вызвано на клиенте
- сервис снова работает
- подключен не достигнут снова с повторной попыткой
В общем, что меня смущает, так это почему разница в поведении между этими двумя сценариями ios? Почему в первом сценарии обнаружено, что сервер вернул UNAVAILABLE, и попытка повторной попытки предпринята, но во втором случае даже при том же состоянии повторная попытка не работает?
Вот код для вызова connect on клиент, метод connect для службы и настройка политики повторных попыток на клиенте
client:
messageStub.withWaitForReady().connect(messagesRequest, new StreamObserver<>() {
@Override
public void onNext(MessageResponse messageResponse) {
//process new message
MessageDto message = new MessageDto();
message.setBody(messageResponse.getBody());
message.setTitle(messageResponse.getTitle());
messageService.broadcastMessage(message);
}
@Override
public void onError(Throwable throwable) {
//service went down
LOGGER.error(throwable.getStackTrace());
}
@Override
public void onCompleted() {
//This method should be called when user logs out of the application
LOGGER.info(String.format("Message streaming terminated for user %d", userId));
}
});
service:
@Override
public void connect(MessageRequest request, StreamObserver<MessageResponse> responseObserver) {
Long userId = request.getUserId();
ServerCallStreamObserver<MessageResponse > serverCallStreamObserver =
(ServerCallStreamObserver<MessageResponse >) responseObserver;
serverCallStreamObserver.setOnCancelHandler(getOnCancelHandler(userId));
registerClient(userId, serverCallStreamObserver);
//responseObserver.onCompleted() is left out so connection is not terminated
}
@EventListener
public void listenForMessages(MessageEvent messageEvent) {
//omitted code (just some data retrieving - populate conn and message vars)....
MessageResponse.Builder builder = MessageResponse.newBuilder();
StreamObserver<MessageResponse> observer = conn.getResponseObserver();
builder.setType(message.getType());
builder.setTitle(message.getTitle());
builder.setBody(message.getBody());
observer.onNext(builder.build())
}
retryPolicy:
{
"methodConfig" : [
{
"name": [
{
"service": "ch.example.proto.MessageService",
"method": "connect"
}
],
"retryPolicy": {
"maxAttempts": 10,
"initialBackoff": "5s",
"maxBackoff": "30s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE"]
}
}
]
}