Как закрыть приложение потоков Kafka, если topi c недоступен? - PullRequest
2 голосов
/ 03 августа 2020

Я использую Kstreams с приложением SpringBoot. Я добавил приведенный ниже код для обработки выключения для потоков.

    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    final CountDownLatch latch = new CountDownLatch(1);
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
        @Override
        public void run() {
            streams.close(Duration.ofMillis(30000));
            latch.countDown();
        }
    });
    try {
        streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
            logger.error("Stream stopped with uncaught error", e);
            if (e.getCause() instanceof OffsetOutOfRangeException)
                streams.cleanUp();
            System.exit(1);
        });
        streams.start();
        latch.await();
    } catch (Throwable e) {
        logger.error("Stream stopped with error");
        System.exit(1);
    }

Однако, когда используемый topi c недоступен в kafka, ловушка выключения никогда не достигается, приложение просто регистрирует завершение выключения и защелка застревает в ожидании, поэтому приложение никогда не закрывается. Как в этом случае выполнить плавное завершение работы?

Ответы [ 2 ]

0 голосов
/ 05 августа 2020

Вы можете зарегистрировать слушателя состояния («состояние» означает РАБОТАЕТ, ПЕРЕБАЛАНСИРОВКА и т. Д. c - это не связано с хранилищем состояний ...) в экземпляре KafkaStreams.

Слушателем состояния является вызывается каждый раз при изменении состояния клиента. В случае, о котором вы упоминаете, состояние также будет переходить, поэтому вы можете обязательно «разблокировать» защелку.

Cf. https://docs.confluent.io/current/streams/monitoring.html#status -of-kafkastreams-instance

0 голосов
/ 03 августа 2020

Потребители Kafka постоянно ищут темы, на которые они подписаны, даже если они не существуют. Это более очевидно при использовании шаблонов регулярных выражений для подписки. UNKNOWN_TOPIC_OR_PARTITION не является ошибкой для потребителей.

Вам придется сначала вручную перечислить темы, а затем проверить, содержит ли этот список топи c, которые вы пытаетесь прочитать.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...