Когда серверу не удается подключиться к теме kafka с доступом потребителя, я вижу чрезмерное количество сообщений об ошибках и предупреждений, записанных в журналы.Я вижу сотни журналов в секунду.
Основная проблема заключается в неправильных разрешениях: у меня пока нет доступа к используемому CN.Я нахожусь в процессе исправления этого, но я хочу убедиться, что, если возникнет такая проблема с подключением, я снова не получу гигабайтные логи.
Я попытался добавить ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG
и ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG
к свойствам Consumer Factory, как я видел, но конфиги, похоже, ничего не сделали.
В конце дня я хочу, чтобы количество попыток соединения сужалосьпока вы видите только одну каждые 5 минут в секунду.
Повторяющиеся ошибки:
17:38:33.068 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler -
Error while processing: null
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [payments.transaction.submit]
17:38:33.069 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] WARN o.a.k.c.consumer.internals.Fetcher -
[Consumer clientId=consumer-2, groupId=xxxxx] Not authorized to read from topic payments.transaction.submit.
17:38:33.069 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler -
Error while processing: null
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [payments.transaction.submit]
17:38:33.069 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] WARN o.a.k.c.consumer.internals.Fetcher -
[Consumer clientId=consumer-2, groupId=xxxxx] Not authorized to read from topic payments.transaction.submit.
17:38:33.069 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler -
Error while processing: null
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [payments.transaction.submit]
17:38:33.070 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] WARN o.a.k.c.consumer.internals.Fetcher -
[Consumer clientId=consumer-2, groupId=xxxxx] Not authorized to read from topic payments.transaction.submit.
17:38:33.070 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler -
Error while processing: null
Потребитель:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
~~~~~~~
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakBrokerUrl + ":" + kafkaBrokerPort);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 1000L); // 1 Sec
props.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 10 * 1000L); // 10 Sec
props.put(ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 300 * 1000L); // 5 Min
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaTruststorePath);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaTruststorePW);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeystorePath);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, kafkaKeystorePW);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaSslKeyPW);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Слушатель Кафки:
@Component
public class Listener {
private final Logger audit = LoggerFactory.getLogger("auditLogger");
@KafkaListener(topics = "bob.the.topic", groupId = "ConsumerGroupBob")
public void listen(@Payload String message) throws Exception {
***** DO STUFF HERE ****
}
}
POM.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.7.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
...