Управление включением / отключением потребителей Kafka в Spring Boot - PullRequest
0 голосов
/ 29 января 2019

Я настроил несколько потребителей Kafka в Spring Boot.Вот как выглядит файл kafka.properties (здесь перечислены только конфигурации для одного потребителя):

kafka.topics=
bootstrap.servers=
group.id=
enable.auto.commit=
auto.commit.interval.ms=
session.timeout.ms=
schema.registry.url=
auto.offset.reset=
kafka.enabled=

Вот конфигурация:

@Configuration
@PropertySource({"classpath:kafka.properties"})
public class KafkaConsumerConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConsumerFactory<String, String> pindropConsumerFactory() {
        Map<String, Object> dataRiverProps = new HashMap<>();

        dataRiverProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
        dataRiverProps.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));
        dataRiverProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, env.getProperty("enable.auto.commit"));
        dataRiverProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, env.getProperty("auto.commit.interval.ms"));
        dataRiverProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("session.timeout.ms"));

        dataRiverProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        dataRiverProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        dataRiverProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, env.getProperty("auto.offset.reset"));

        return new DefaultKafkaConsumerFactory<>(dataRiverProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(pindropConsumerFactory());
        return factory;
    }
}

И это потребитель:

@Component
public class KafkaConsumer {

    @Autowired
    private MessageProcessor messageProcessor;


    @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
    public void consumeJson(String message) {
        // processing message
    }
}

Могу ли я использовать опору "kafka.enabled", чтобы я мог контролировать создание или, возможно, поиск сообщений этого потребителя?Большое спасибо!

Ответы [ 2 ]

0 голосов
/ 15 мая 2019

Вы можете сделать это, используя свойство autoStartup (true / false) для потребителя, как показано ниже -

@KafkaListener(id = "foo", topics = "Topic1", groupId = "group_id",
        containerFactory = "kafkaListenerContainerFactory",autoStartup = "${listen.auto.start:false}")
public void consume(String message) {
    //System.out.println("Consumed message: " + message);
}
0 голосов
/ 29 января 2019

Чтобы отключить настройку Kafka, вы можете, например:

  1. Annotate KafkaConsumerConfig с помощью

    @ConditionalOnProperty(value = "kafka.enabled", matchIfMissing = true)

  2. Удалить@Component в KafkaConsumer классе и определите его как @Bean в KafkaConsumerConfig.

Для управления поиском сообщений в KafkaConsumer:

  1. Просто получите значение свойства внутри KafkaConsumer @Value("kafka.enabled") private Boolean enabled;

  2. , а затем используйте простой метод if, аннотированный @KafkaListener.

...