Использование StateRestoreListener с привязкой Spring Cloud Kafka Streams - PullRequest
1 голос
/ 19 февраля 2020

Я собираюсь использовать StateRestoreListener с привязкой Spring Cloud Kafka Streams. Мне нужно следить за ходом восстановления отказоустойчивых хранилищ состояний моих приложений. В слиянии https://docs.confluent.io/current/streams/monitoring.html#streams -monitoring-runtime-status есть пример .

Для наблюдения за восстановлением всех хранилищ состояний вы предоставляете вашему приложению экземпляр интерфейса org. apache .kafka.streams.processor.StateRestoreListener. Вы устанавливаете org. apache .kafka.streams.processor.StateRestoreListener, вызывая метод KafkaStreams # setGlobalStateRestoreListener.

Первая проблема - получение потоков Кафки из приложения. Я решил эту проблему с помощью

StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();

Вторая проблема - установка StateRestoreListener на KafkaStreams, потому что я получаю ошибку

java .lang.IllegalStateException: Может устанавливать только GlobalStateRestoreListener в СОЗДАННОЕ состояние. Текущее состояние: RUNNING

Возможно ли использовать StateRestoreListener в связывателе Spring Cloud Kafka Streams? Спасибо

1 Ответ

3 голосов
/ 19 февраля 2020

Вы можете сделать это, используя StreamsBuilderFactoryBeanCustomizer, который дает вам доступ к базовому KafkaStreams объекту. Если вы используете связующие версии 3.0 или выше, это рекомендуемый подход. Например, вы можете указать следующее bean в своем приложении и настроить его с помощью GlobalStateRestoreListener.

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setGlobalStateRestoreListener(...);
            }
        });
    };
}

Этот блог содержит более подробную информацию об этой стратегии.

...