Крюк отключения Kafka Streams и обработка неожиданных исключений в одном приложении Stream - PullRequest
0 голосов
/ 27 сентября 2018

Мне было поручено разрушить среду разработки и заново настроить ее с нуля, чтобы проверить наши процессы CI-CD;Единственная проблема заключалась в том, что я испортил создание одной темы, и поэтому приложение Kafka Streams вышло с ошибкой.

Я покопался в ней, нашел проблему и исправил ее, но, копаясь, я столкнулся с другой странной маленькойпроблема.

Я реализовал обработчик неожиданных исключений следующим образом:

streams.setUncaughtExceptionHandler((t, e) -> {
    logger.fatal("Caught unhandled Kafka Streams Exception:", e);
    // Do some exception handling.
    streams.close();

    // Maybe do some more exception handling.
    // Open a lock that is waiting after streams.start() call 
    // to let application exit normally
    shutdownLatch.countDown();
});

Проблема в том, что если приложение выдало исключение из-за ошибки темы, когда KafkaStreams :: close вызывает приложениекажется, что в WindowsSelectorImpl :: poll отсутствует блокировка после попытки вызова KafkaStreams :: waitOnState.

Я подумал, что это может быть проблема с вызовом KafkaStreams :: close внутри обработчика исключений, но я нашел это SO и комментарий от Matthias J. Sax , в котором говорится, что в обработчике исключений должно быть нормально вызывать KafkaStreams :: Close с предупреждением о том, что KafkaStreams :: close нельзя вызывать из нескольких потоков.

Проблема в том, что я хочу реализовать хук отключения, чтобы изящно убить приложение Steamпо запросу, а также реализовать обработчик UnexpectedException для очистки и корректного завершения в случае исключений.

Я нашел следующее решение, которое проверяет состояние KafkaStreams перед вызовом close, и оно действительно работает, ноэто кажется немного ненадежным, так как я мог видеть другие случаи, кроме запуска (возможно, в ожидании), когда мы хотели бы обеспечить вызов KafkaStreams :: close, который он вызывал.

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    logger.fatal("Caught Shutdown request");
    // Do some shutdown cleanup.
    if (streams.state().isRunning())
    {
        If this hook is called due to the Main exiting after handling 
        an exception we don't want to call close again. It doesn't 
        cause any errors but logs that the application was closed 
        a second time.
        streams.close(100L, TimeUnit.MILLISECONDS);
    }
    // Maybe do a little bit more clean up before system exits.
    System.exit(0);

}));

streams.setUncaughtExceptionHandler((t, e) -> {
    logger.fatal("Caught unhandled Kafka Streams Exception:", e);
    // Do some exception handling.
    if (streams.state().isRunning())
    {
        streams.close(100L, TimeUnit.MILLISECONDS);
    }
    // Maybe do some more exception handling.

    // Open the Gate to let application exit normally
    shutdownLatch.countDown();
    // Or Optionally call halt to immediately terminate and prevent call to Shutdown hook.
    Runtime.getRuntime().halt(0);
});

Любые предложения о том, почему вызывать KafkaSteams: close inобработчик исключений будет вызывать такие проблемы, или если будет более эффективный способ реализовать обработчик завершения работы и обработчик исключений в то же время, это будет высоко ценится?

1 Ответ

0 голосов
/ 30 сентября 2018

Вызов close() из обработчика исключений и из ловушки отключения немного отличается.close() может зайти в тупик, если вызывается из ловушки отключения (см. https://issues.apache.org/jira/browse/KAFKA-4366) и, следовательно, вы должны вызывать ее с таймаутом.

Кроме того, проблема связана с вызовом System.exit() изнутринеобработанный обработчик исключений, как описано в Jira. В общем, вызов System.exit() довольно резок и его следует избегать ИМХО.

Ваше решение, по-видимому, также не на 100% надежно, потому что streams.state().isRunning() может привести кв состоянии гонки.

Альтернативой использованию тайм-аута может быть установка только AtomicBoolean как в ловушке завершения работы, так и в обработчике исключений, и использование потока "main ()" для вызова close, если логическое значениефлаг установлен в true:

private final static AtomicBoolean stopStreams = new AtomicBoolean(false);

public static void main(String[] args) {
  // do stuff

  KafkaStreams streams = ...
  stream.setUncaughtExceptionHandler((t, e) -> {
    stopStreams.set(true);
  });

  Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    stopStreams.set(true);
  });

  while (!stopStreams.get()) {
    Thread.sleep(1000);
  }
  streams.close();
}
...