Spring kafka воссоздает топологию Kafka Stream во время выполнения - PullRequest
1 голос
/ 10 апреля 2020

У меня есть приложение, основанное на Spring Boot, Spring-Kafka и Kafka-Streams. Когда приложение запускается, оно создает топологию потоков kafka со списком тем по умолчанию. Что мне нужно сделать, это изменить / воссоздать топологию во время выполнения. Например, когда приложение уже запущено, появляется новое имя topi c, и я хочу добавить этот topi c в мою топологию. В настоящее время я думаю о том, как удалить существующую топологию, закрыть и очистить KafkaStreams, запустить logi c, где я создаю топологию, но с новым именем topi c и снова запустить KafkaStreams. Я не хочу перезапускать свое приложение. Может кто-нибудь подсказать мне, как это сделать во время выполнения?

1 Ответ

1 голос
/ 10 апреля 2020

Я нашел 1 решение. Я расширяю StreamsBuilderFactoryBean:

@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
@Primary
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(KafkaStreamsConfiguration kStreamsConfigs) {
    return new DynamicStreamsBuilderFactoryBean(kStreamsConfigs);
}

public static class DynamicStreamsBuilderFactoryBean extends StreamsBuilderFactoryBean {

    private StreamsBuilder instance;

    public DynamicStreamsBuilderFactoryBean(final KafkaStreamsConfiguration streamsConfig) {
        super(streamsConfig);
    }

    @Override
    public boolean isSingleton() {
        return false;
    }

    @Override
    protected synchronized StreamsBuilder createInstance() {
        if (instance == null) {
            instance = new StreamsBuilder();
        }
        return instance;
    }

    @Override
    public synchronized void stop() {
        instance = null;
        super.stop();
    }
}

И когда я строю топологию, я вместо StreamsBuilder использую StreamsBuilderFactoryBean # getObject ():

@Component

publi c class DynamicStream {

private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;

public void init() {
    StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
        //build topology
}

//call this method when stream reconfiguration is needed
public void reinitialize() {
    streamsBuilderFactoryBean.stop();
    init();
    streamsBuilderFactoryBean.start();
}

}

...