Kafka Streams не может восстановиться в случае исключения при обработке сообщений - PullRequest
0 голосов
/ 17 мая 2018

Я играю с Kafka Streams и столкнулся с ситуацией, которая кажется мне немного странной. Я искал по этому вопросу, но не мог найти правильный ответ. Во время обработки элементов в потоке, если возникает какое-либо необработанное исключение, возникает следующая ошибка, и Kafka Streams не может восстановиться, останавливает обработку дальнейших сообщений, затем мне нужно перезапустить приложение.

2018-05-17 11:00:51.304  INFO 43810 --- [-StreamThread-3] o.a.k.s.p.internals.StreamThread         : stream-thread [snapshot-stream-application-a1dbc5d4-1742-4af2-9919-d4f5f861bb36-StreamThread-3] Stream thread shutdown complete
2018-05-17 11:00:51.304  WARN 43810 --- [-StreamThread-3] o.a.k.s.p.internals.StreamThread         : stream-thread [snapshot-stream-application-a1dbc5d4-1742-4af2-9919-d4f5f861bb36-StreamThread-3] Unexpected state transition from RUNNING to DEAD.
2018-05-17 11:00:51.305 ERROR 43810 --- [-StreamThread-3] c.u.kafkastreams.config.SnapshotStream   : Error while processing:

org.apache.kafka.streams.errors.ProcessorStateException: task [0_4] Failed to flush state store KTABLE-SOURCE-STATE-STORE-0000000000

Конечно, перехват всех исключений решит проблему, но я ищу правильный механизм обработки исключений для Kafka Streams. Вот мой пример кода:

  private KafkaStreams streams;

    @PostConstruct
    public void runStreams() {
        KStreamBuilder builder = new KStreamBuilder();
        KTable<String, String> snapshotTable = builder.table(kafkaProperties.getInputTopic());
        snapshotTable.toStream().foreach((key, value) -> {
            process(value);
        });

        streams = new KafkaStreams(builder, consumerProps());
        streams.cleanUp();
        streams.start();
        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                LOGGER.error("Error while processing:", e);
            }
        });
    }

    private void process(String message) {
        simulateException(message);
        LOGGER.info("Key: {} Value: {}", key, value);
    }

    private void simulateException(String message) {
        if (message.contains("abc")) {
            throw new RuntimeException();
        }
    }

    @PreDestroy
    private void closeStream() {
        streams.close();
    }

    private Properties consumerProps() {
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application");
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrap());
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
        return config;
    }

(я пробовал с kafka-streams 0.11.0.0 и 1.1.0)

...