Мне было поручено разрушить среду разработки и заново настроить ее с нуля, чтобы проверить наши процессы CI-CD;Единственная проблема заключалась в том, что я испортил создание одной темы, и поэтому приложение Kafka Streams вышло с ошибкой.
Я покопался в ней, нашел проблему и исправил ее, но, копаясь, я столкнулся с другой странной маленькойпроблема.
Я реализовал обработчик неожиданных исключений следующим образом:
streams.setUncaughtExceptionHandler((t, e) -> {
logger.fatal("Caught unhandled Kafka Streams Exception:", e);
// Do some exception handling.
streams.close();
// Maybe do some more exception handling.
// Open a lock that is waiting after streams.start() call
// to let application exit normally
shutdownLatch.countDown();
});
Проблема в том, что если приложение выдало исключение из-за ошибки темы, когда KafkaStreams :: close вызывает приложениекажется, что в WindowsSelectorImpl :: poll отсутствует блокировка после попытки вызова KafkaStreams :: waitOnState.
Я подумал, что это может быть проблема с вызовом KafkaStreams :: close внутри обработчика исключений, но я нашел это SO и комментарий от Matthias J. Sax , в котором говорится, что в обработчике исключений должно быть нормально вызывать KafkaStreams :: Close с предупреждением о том, что KafkaStreams :: close нельзя вызывать из нескольких потоков.
Проблема в том, что я хочу реализовать хук отключения, чтобы изящно убить приложение Steamпо запросу, а также реализовать обработчик UnexpectedException для очистки и корректного завершения в случае исключений.
Я нашел следующее решение, которое проверяет состояние KafkaStreams перед вызовом close, и оно действительно работает, ноэто кажется немного ненадежным, так как я мог видеть другие случаи, кроме запуска (возможно, в ожидании), когда мы хотели бы обеспечить вызов KafkaStreams :: close, который он вызывал.
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.fatal("Caught Shutdown request");
// Do some shutdown cleanup.
if (streams.state().isRunning())
{
If this hook is called due to the Main exiting after handling
an exception we don't want to call close again. It doesn't
cause any errors but logs that the application was closed
a second time.
streams.close(100L, TimeUnit.MILLISECONDS);
}
// Maybe do a little bit more clean up before system exits.
System.exit(0);
}));
streams.setUncaughtExceptionHandler((t, e) -> {
logger.fatal("Caught unhandled Kafka Streams Exception:", e);
// Do some exception handling.
if (streams.state().isRunning())
{
streams.close(100L, TimeUnit.MILLISECONDS);
}
// Maybe do some more exception handling.
// Open the Gate to let application exit normally
shutdownLatch.countDown();
// Or Optionally call halt to immediately terminate and prevent call to Shutdown hook.
Runtime.getRuntime().halt(0);
});
Любые предложения о том, почему вызывать KafkaSteams: close inобработчик исключений будет вызывать такие проблемы, или если будет более эффективный способ реализовать обработчик завершения работы и обработчик исключений в то же время, это будет высоко ценится?