Счета потребителей Kafka Spring не выполняются с ConsumerRecordRecoverer - PullRequest
0 голосов
/ 16 июня 2020

Технические данные:

Версии:

spring-boot : 2.2.2.RELEASE
spring-kafka : 2.3.7.RELEASE
kafka broker : 2.3.1 (via amazon MSK)

Реквизит:

auto.offset.reset: earliest
enable.auto.commit: false
isolation.level: read_committed

Проблема и поведение:

У меня KafkaListener с использованием ConcurrentKafkaListenerContainerFactory, сконфигурированного с пользовательской реализацией ConsumerRecordRecoverer. Я заметил, что когда этот контейнер восстанавливается после некоторого исключения, смещения потребителя для указанного восстановленного сообщения не фиксируются. Смещения фиксируются только тогда, когда сообщение успешно обработано (т.е. без восстановления). Однако приемник / потребитель / контейнер, похоже, сохраняет реальное смещение в памяти, поскольку будет продвигаться мимо восстановленного сообщения, пока это приложение остается работающим.

Это вызовет проблему, если приложение весенней загрузки будет перезапущено, когда последнее сообщение не было успешно обработано, и возобновит работу с последнего фактически зафиксированного смещения, вероятно, повторно обрабатывая сообщения, которые были восстановлены, но чьи смещения не были зафиксированы.

Я подтвердил это локальным тестом на пустом топи c.

  1. До: смещение группы потребителей было 0 для раздела 0 в kafka
  2. Вставлено сообщение, которое приводит к исключению прослушивателя и восстановлению.
  3. После: Группа потребителей смещение осталось 0 для раздела 0, теперь с запаздыванием.

На этом этапе я предполагаю, что мне не хватает какой-то критической конфигурации или установщика для артефактов пружины, но мне не ясно, что такое отсутствует. Я предполагал, что это будет целью использования от DefaultAfterRollbackProcessor#setCommitRecovered до true.

Примеры кода

KafkaConfiguration

@Configuration
public class KafkaConfig {


  @Bean
  ConsumerRetryConfig retryConfig() {
    return new ConsumerRetryConfig();
  }

  @Bean
  public RetryTemplate consumerRetryTemplate(ConsumerRetryConfig consumerRetryConfig) {
    RetryTemplate retryTemplate = new RetryTemplate();

    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    fixedBackOffPolicy.setBackOffPeriod(consumerRetryConfig.getRetryWaitInterval());
    retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(consumerRetryConfig.getMaxRetries());
    retryTemplate.setRetryPolicy(retryPolicy);

    return retryTemplate;
  }

  @Bean
  @Lazy
  FiniteRequeueingRecovererConfig finiteRequeueingRecovererConfig() {
    return new FiniteRequeueingRecovererConfig();
  }

  @Bean
  @Lazy
  FiniteRequeueingRecordRecoverer finiteRequeueingRecordRecoverer(
    KafkaTemplate<String, SpecificRecord> kafkaTemplate,
    FiniteRequeueingRecovererConfig finiteRequeueingRecovererConfig
  ) {
    return new FiniteRequeueingRecordRecoverer(kafkaTemplate, finiteRequeueingRecovererConfig.getMaxRequeues());
  }

  @Bean
  @Lazy
  DefaultAfterRollbackProcessor finiteRequeueingRollbackProcessor(
    FiniteRequeueingRecordRecoverer finiteRequeueingRecordRecoverer,
    ConsumerRetryConfig consumerRetryConfig
  ) {
    DefaultAfterRollbackProcessor ret = new DefaultAfterRollbackProcessor(
      finiteRequeueingRecordRecoverer,
      new FixedBackOff(
        consumerRetryConfig.getRetryWaitInterval(),
        consumerRetryConfig.getMaxRetries()
      )
    );
    ret.setCommitRecovered(true);
    return ret;
  }

  @Bean
  public ProducerFactory<String, SpecificRecord> avroMessageProducerFactory(KafkaProperties kafkaProperties) {
    Map<String, Object> props = MapBuilder.<String, Object>builder()
      .putAll(kafkaProperties.buildProducerProperties())
      .put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString())
      .build();

    return (kafkaAvroSerializer==null) ?
      new DefaultKafkaProducerFactory<>(props) :
      new DefaultKafkaProducerFactory(props, new StringSerializer(), kafkaAvroSerializer);
  }

  @Bean
  public KafkaTemplate<String, SpecificRecord> avroMessageKafkaTemplate(ProducerFactory<String, SpecificRecord> avroMessageProducerFactory) {
    return new KafkaTemplate<>(avroMessageProducerFactory);
  }

  @Bean
  public KafkaTransactionManager<?,?> kafkaTransactionManager(ProducerFactory<String, SpecificRecord> avroMessageProducerFactory) {
    return new KafkaTransactionManager<>(avroMessageProducerFactory);
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<?, ?> finiteRequeueingKafkaListenerContainerFactory(
    ConsumerFactory<Object, Object> consumerFactory,
    ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
    KafkaTransactionManager<Object, Object> kafkaTransactionManager,
    DefaultAfterRollbackProcessor finiteRequeueingRollbackProcessor
  ) {

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, consumerFactory);
    factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);

    factory.setStatefulRetry(true);
    factory.setAfterRollbackProcessor(finiteRequeueingRollbackProcessor);

    return factory;
  }

  @KafkaListener(
    id = "${some.listener-id}",
    topics = "${some.topic}",
    groupId = "${some.group-id}",
    containerFactory = "finiteRequeueingKafkaListenerContainerFactory"
  )
  public void consume(
    @Payload WebhookNotificationMessage message,
    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
    @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
  ) throws Exception {

    // Do the thing, maybe throw an exception

  }

}

FiniteRequeueingRecordRecoverer

public class FiniteRequeueingRecordRecoverer implements ConsumerRecordRecoverer {
  private final Logger logger = LoggerLike.getLogger(FiniteRequeueingRecordRecoverer.class);

  private KafkaTemplate<String, SpecificRecord> kafkaTemplate;
  private Integer maxRequeues;

  public FiniteRequeueingRecordRecoverer(KafkaTemplate<String, SpecificRecord> kafkaTemplate, Integer maxRequeues) {
    this.kafkaTemplate = kafkaTemplate;
    this.maxRequeues = maxRequeues;
  }

  @Override
  public void accept(ConsumerRecord<?, ?> consumerRecord, Exception e) {

    // Not sure the substance of this recoverer is relevant...but if so
    // If the retry number in the avro record is < this.maxRequeues
    //   then increment the retries and re enqueue this message, move on
    // If retries have been exhausted, do not requeue and send to a dead letter or just abandon
  }
}

1 Ответ

0 голосов
/ 17 июня 2020

Для DefaultAfterRollbackProcessor требуется KafkaTemplate для отправки смещения в новую транзакцию.

Вероятно, нам следует зарегистрировать предупреждение, если commitRecovered истинно и нет KT.

...