Как работает AckMode как BATCH с max.poll.interval.ms и enable.auto.commit как false? - PullRequest
0 голосов
/ 15 апреля 2020

Я использую spring-kafka '2.1.7.RELEASE' и пытаюсь понять, как max.poll.interval.ms работает с AckMode в качестве BATCH и enable.auto.commit как 'false'. Вот мои настройки потребителя.

    public Map<String, Object> setConsumerConfigs() {

           Map<String, Object> configs = = new HashMap<>();

           configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
           configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

           configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "400000");

           configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
           configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);

           configs.put(ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS, stringDeserializerClass);
           configs.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, kafkaAvroDeserializerClass.getName());

           configs.setPartitionAssignmentStrategyConfig(Collections.singletonList(RoundRobinAssignor.class));

           // Set this to true so that you will have consumer record value coming as your pre-defined contract instead of a generic record
           sapphireKafkaConsumerConfig.setSpecificAvroReader("true");
       }

и вот мои заводские настройки

        @Bean
         public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
           ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
           factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getConsumerConfigs));
           factory.getContainerProperties().setMissingTopicsFatal(false);

           factory.getContainerProperties().setAckMode(AckMode.BATCH);

           factory.setErrorHandler(myCustomKafkaSeekToCurrentErrorHandler);
           factory.setRetryTemplate(retryTemplate());
           factory.setRecoveryCallback(myCustomKafkaRecoveryCallback);
           factory.setStatefulRetry(true);
           return factory;
         }

         public RetryTemplate retryTemplate() {
            RetryTemplate retryTemplate = new RetryTemplate();
            retryTemplate.setListeners(new RetryListener[]{myCustomKafkaRetryListener});
            retryTemplate.setRetryPolicy(myCustomKafkaConsumerRetryPolicy);

            FixedBackOffPolicy backOff = new FixedBackOffPolicy();
            backOff.setBackOffPeriod(1000);
            retryTemplate.setBackOffPolicy(backOff);


            return retryTemplate;
          }

Вот мой потребитель, где я добавил задержку в 2 минуты

@KafkaListener(topics = TestConsumerConstants.CONSUMER_LONGRUNNING_RECORDS_PROCESSSING_TEST_TOPIC
      , clientIdPrefix = "CONSUMER_LONGRUNNING_RECORDS_PROCESSSING"
      , groupId = "kafka-lib-comp-test-consumers")
  public void consumeLongRunningRecord(ConsumerRecord message) throws InterruptedException {
    System.out.println(String.format("\n \n Received message at %s offset %s of partition %s of topic %s with key %s \n\n", DateTime.now(),
        message.offset(), message.partition(), message.topic(), message.key()));

    TimeUnit.MINUTES.sleep(2);

    System.out.println(String.format("\n \n Processing done for the message at %s offset %s of partition %s of topic %s with key %s \n\n", DateTime.now(),
        message.offset(), message.partition(), message.topic(), message.key()));
  }

Теперь я опубликовал 5 сообщений и заметил, что он обработал все записи без проблем. Но если я установил для AckMode значение RECORD, он выдает ошибку ниже, фиксируя смещение после обработки 4-го сообщения, а затем дважды обработал одно и то же сообщение (что ожидается).

Согласно документации spring-kafka, AckMode = BATCH будет фиксировать смещение, когда все записи, возвращенные poll (), будут обработаны.

Теперь мой вопрос: как AckMode изменяет поведение, не вызывая перебалансировки после передачи max.poll.interval.ms? Пожалуйста, помогите мне понять.

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

Ответы [ 2 ]

0 голосов
/ 15 апреля 2020

Изменение режима подтверждения не повлияет на балансировку; max.poll.interval.ms - это время между вызовами на poll(), которое не изменяется в режиме подтверждения.

Поскольку ваш интервал составляет 6,67 минут; Я ожидал бы, что это потерпит неудачу и в режиме BATCH; возможно, опрос вернул только 2 или 3 записи для этого теста, тогда как 4 или 5 записей были возвращены при тестировании режима RECORD.

Вы можете получить количество записей, возвращаемых опросом, включив ведение журнала DEBUG для org.springframework.kafka.

0 голосов
/ 15 апреля 2020

я думаю, что этот поток поможет прояснить несколько вещей, если не полностью,

Разница между session.timeout.ms и max.poll.interval.ms для Kafka 0.10.0.0 и более поздних версий

...