Как включить откат Kafka для потребителя Springframework Kafka? - PullRequest
0 голосов
/ 20 сентября 2019

Когда серверу не удается подключиться к теме kafka с доступом потребителя, я вижу чрезмерное количество сообщений об ошибках и предупреждений, записанных в журналы.Я вижу сотни журналов в секунду.

Основная проблема заключается в неправильных разрешениях: у меня пока нет доступа к используемому CN.Я нахожусь в процессе исправления этого, но я хочу убедиться, что, если возникнет такая проблема с подключением, я снова не получу гигабайтные логи.

Я попытался добавить ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIGи ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG к свойствам Consumer Factory, как я видел, но конфиги, похоже, ничего не сделали.

В конце дня я хочу, чтобы количество попыток соединения сужалосьпока вы видите только одну каждые 5 минут в секунду.

Повторяющиеся ошибки:

17:38:33.068 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler -                                                                                          
                Error while processing: null                                                                                                                                                                        
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [payments.transaction.submit]                                                                                          
17:38:33.069 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] WARN  o.a.k.c.consumer.internals.Fetcher -                                                                                          
                [Consumer clientId=consumer-2, groupId=xxxxx] Not authorized to read from topic payments.transaction.submit.                                                                              
17:38:33.069 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler -                                                                                          
                Error while processing: null                                                                                                                                                                        
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [payments.transaction.submit]                                                                                          
17:38:33.069 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] WARN  o.a.k.c.consumer.internals.Fetcher -                                                                                          
                [Consumer clientId=consumer-2, groupId=xxxxx] Not authorized to read from topic payments.transaction.submit.                                                                              
17:38:33.069 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler -                                                                                          
                Error while processing: null                                                                                                                                                                        
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [payments.transaction.submit]                                                                                          
17:38:33.070 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] WARN  o.a.k.c.consumer.internals.Fetcher -                                                                                          
                [Consumer clientId=consumer-2, groupId=xxxxx] Not authorized to read from topic payments.transaction.submit.                                                                              
17:38:33.070 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler -                                                                                          
                Error while processing: null

Потребитель:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    ~~~~~~~

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakBrokerUrl + ":" + kafkaBrokerPort);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 1000L); // 1 Sec
        props.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 10 * 1000L); // 10 Sec
        props.put(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 300 * 1000L); // 5 Min

        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaTruststorePath);
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaTruststorePW);
        props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeystorePath);
        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaKeystorePW);
        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaSslKeyPW);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

Слушатель Кафки:

@Component
public class Listener {
    private final Logger audit = LoggerFactory.getLogger("auditLogger");

    @KafkaListener(topics = "bob.the.topic", groupId = "ConsumerGroupBob")
    public void listen(@Payload String message) throws Exception {
        ***** DO STUFF HERE ****
    }
}

POM.xml

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.7.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...