Перехват исключений слушателя в долго работающей облачной службе подписчика PubSub - PullRequest
0 голосов
/ 09 мая 2019

Я пытаюсь написать долгосрочную подписку на Java. Я настроил Слушатели для прослушивания любых сбоев внутри службы подписчика. Я пытаюсь сделать это отказоустойчивым, и я не совсем понимаю несколько вещей, Ниже приведены мои сомнения / вопросы.

  1. Я следовал базовой настройке, показанной здесь https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java. В частности, я установил addListener, как показано ниже.

Как показано в следующем коде, initializeSubscriber действует как переменная состояния, которая будет определять, следует ли перезапустить службу подписчика. Внутри цикла while эта переменная постоянно отслеживается, чтобы определить, требуется ли перезапуск.

Мой вопрос здесь 1. Как вызвать исключение внутри сбойного метода Subscriber.Listener и перехватить его в главном цикле while. Я попытался добавить новое Exception () в методе fail и перехватить его в блоке catch внутри, однако я не могу скомпилировать код, так как это проверенное исключение. 2. Как показано здесь, я использую поток Java Executor для запуска Listener. Как мне справиться с ошибками Слушателя? Смогу ли я перехватывать ошибки прослушивателя в общем блоке перехвата исключений, как показано здесь?

try {
 boolean initializeSubscriber = true;
    while (true) {
        try {
           if (initializeSubscriber) { 
             createSingleThreadedSubscriber();
             addErrorListenerToSubscriber();
             subscriber.startAsync().awaitRunning();
             initializeSubscriber = false;
           }

          // Checks the status of subscriber service every minute
          Thread.sleep(60000);

        } catch (Exception ex) {
          LOGGER.error("Could not start the Subscriber service", ex);
          cleanupSubscriber();
          initializeSubscriber = true;
        }
    }
} catch (RuntimeException e) {

} finally {
    shutdown();
}
private void addErrorListenerToSubscriber() {
    subscriber.addListener(
      new Subscriber.Listener() {
          @Override
          public void failed(Subscriber.State from, Throwable failure) throws RuntimeException { 
            LOGGER.info("Subscriber reached a failed state due to " + failure.getMessage()
                + ",Restarting Subscriber service");
            initializeSubscriber = true; 
          }
      },
      Executors.newSingleThreadExecutor());
  }

  private void cleanupSubscriber() {

    try {
      if (subscriber != null) {
        subscriber.stopAsync().awaitTerminated();
      }
      if (!subscriptionListener.isShutdown()) {
        subscriptionListener.shutdown();
      }
    } catch (Exception ex) {
      LOGGER.error("Error in cleaning up Subscriber thread " + ex);
    }
  }

1 Ответ

1 голос
/ 09 мая 2019

Нет необходимости добавлять слушателя к подписчику, если вы просто хотите воссоздать подписчика в случае сбоя.Вместо этого вы можете перехватить исключение на awaitTerminated:

try {
  boolean initializeSubscriber = true;
  while (initializeSubscriber) {
    try { 
      createSingleThreadedSubscriber();
      subscriber.startAsync().awaitRunning();
      initializeSubscriber = false;
      subscriber.awaitTerminated();
    } catch (Exception ex) {
      LOGGER.error("Error in the Subscriber service", ex);
      cleanupSubscriber();
      initializeSubscriber = true;
    }
  }
}  catch (RuntimeException e) {
} finally {
  shutdown();
}

Если абонент успешно завершит работу из-за вызова stopAsync, awaitTerminated не сгенерирует исключение.Если было какое-то исключение, то awaitTerminated выдаст IllegalStateException, потому что состояние будет FAILED вместо TERMINATED.

Обратите внимание, что временные ошибки обрабатываются самой библиотекой.Например, если сервер на короткое время становится недоступным из-за перебоев в работе сети, библиотека без проблем пересоединится и продолжит доставлять сообщения.Сбои, которые приводят к изменению состояния подписчика, - это, вероятно, постоянные сбои, такие как проблемы с разрешениями (когда у учетной записи, на которой работает подписчик, нет разрешения на подписку), или проблемы с ресурсами (например, после удаления подписки).В этих случаях постоянных сбоев воссоздание подписчика, скорее всего, приведет к той же самой ошибке, если не предпринять ручные шаги для вмешательства и устранения проблемы.

...