Я использую пружинную загрузку 2.1.9 с пружиной Kafka 2.2.9
Я получаю предупреждение в файле журнала, в котором говорится, что фиксация не удалась, а также я использую SeekToCurrentErrorHandler, чтобы зафиксировать ошибку после завершения повторной попытки, ноиногда, если фиксация не удалась, она продолжает повторяться.
вот мой класс конфигурации
@Configuration
@EnableKafka
public class KafkaReceiverConfig {
// Kafka Server Configuration
@Value("${kafka.servers}")
private String kafkaServers;
// Group Identifier
@Value("${kafka.groupId}")
private String groupId;
// Kafka Max Retry Attempts
@Value("${kafka.retry.maxAttempts:5}")
private Integer retryMaxAttempts;
// Kafka Max Retry Interval
@Value("${kafka.retry.interval:180000}")
private Long retryInterval;
// Kafka Concurrency
@Value("${kafka.concurrency:10}")
private Integer concurrency;
// Kafka Concurrency
@Value("${kafka.poll.timeout:100}")
private Integer pollTimeout;
// Kafka Consumer Offset
@Value("${kafka.consumer.auto-offset-reset:earliest}")
private String offset = "earliest";
// Logger
private static final Logger log = LoggerFactory.getLogger(KafkaReceiverConfig.class);
/**
* Defines the Max Number of Retry Attempts
*
* @return Return the Retry Policy @see {@link RetryPolicy}
*/
@Bean
public RetryPolicy retryPolicy() {
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(retryMaxAttempts);
return simpleRetryPolicy;
}
/**
* Time before the next Retry can happen, the Time used is in Milliseconds
*
* @return Return the BackOff Policy @see {@link BackOffPolicy}
*/
@Bean
public BackOffPolicy backOffPolicy() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(retryInterval);
return backOffPolicy;
}
/**
* Get Retry Template
*
* @return Return the Retry Template @see {@link RetryTemplate}
*/
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy());
retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate;
}
/**
* String Kafka Listener Container Factor
*
* @return @see {@link KafkaListenerContainerFactory}
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
ChainedKafkaTransactionManager<String, String> chainedTM, MessageProducer messageProducer) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setSyncCommits(true);
factory.setRetryTemplate(retryTemplate());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setTransactionManager(chainedTM);
factory.setStatefulRetry(true);
// NOTE: retryMaxAttempts should always +1 due to spring kafka bug
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
log.warn("failed to process kafka message (retries are exausted). topic name:"+record.topic()+" value:"+record.value());
messageProducer.saveFailedMessage(record, exception);
}, retryMaxAttempts + 1);
factory.setErrorHandler(errorHandler);
log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
return factory;
}
/**
* String Consumer Factory
*
* @return @see {@link ConsumerFactory}
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
log.debug("Kafka Receiver Config consumerFactory created");
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
/**
* Consumer Configurations
*
* @return @see {@link Map}
*/
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new ConcurrentHashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Disable the Auto Commit if required for testing
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
log.debug("Kafka Receiver Config consumerConfigs created");
return props;
}
}
вот журнал:
2019-10-30 15:48:05.907 WARN [xxxxx-component-workflow-starter,,,] 11 --- [nt_create-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=fulfillment_create] Synchronous auto-commit of offsets {fulfillment_create-4=OffsetAndMetadata{offset=32, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
- Есть ли проблемы смой файл конфигурации?
- как установить максимальное время опроса и время сеанса и все? (приведите пример)
- Как настроить SeekToCurrentErrorHandler в Spring Kafka 2.2.9, чтобы он работал хорошо (потому что я не могу обновить Spring Kafka из-за некоторых других зависимостей)?