Обновление смещения фиксации Kafka после успешной пакетной вставки - PullRequest
0 голосов
/ 27 января 2019

У меня есть потребитель spring-kafka, который читает записи и передает их в кеш. Запланированное задание будет периодически очищать записи в кэше. Я хочу обновить COMMIT OFFSET только после успешного сохранения партии в базе данных. Я попытался передать объект подтверждения в службу кэширования, чтобы вызвать метод подтверждения, как показано ниже.

public class KafkaConsumer {
    @KafkaListener( topicPattern = "${kafka.topicpattern}", containerFactory = "kafkaListenerContainerFactory" )
    public void receive( ConsumerRecord<String, String> record, Acknowledgment acknowledgment ) {
        cacheService.add( record.value(), acknowledgment );
    }
}

public class CacheService {
    // concurrency handling has been left out in favor of readability
    public void add( String record, Acknowledgment acknowledgment ) {
        this.records.add(record);
        this.lastAcknowledgment = acknowledgment;
    }

    public void saveBatch() { //called by scheduled task
        if( records.size() == BATCH_SIZE ) {
            // perform batch insert into database
            this.lastAcknowledgment.acknowledge();
            this.records.clear();
        }
    }
}

AckMode был установлен следующим образом:

factory.getContainerProperties().setAckMode( AbstractMessageListenerContainer.AckMode.MANUAL );

И автоматическая фиксация ложна:

config.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );

Даже несмотря на то, что метод подтверждения вызывается, смещение фиксации не обновляется. Каков наилучший способ обновления смещения фиксации после сохранения записей?

Я использую spring-kafka 2.1.7. RELEASE.


РЕДАКТИРОВАТЬ : После того, как @GaryRussell подтвердил , что подтверждения, сделанные внешними потоками, выполняются потребительским потоком во время следующего опроса, я перепроверил свой код и обнаружил ошибку в том, как последний объект подтверждения установлен. После исправления смещения фиксации ОБНОВЛЯЕТСЯ, как и ожидалось . Так что эта проблема была решена. Однако я не могу пометить этот вопрос как ответивший.

1 Ответ

0 голосов
/ 27 января 2019

Вот проблема, потребительский поток отвечает за фиксацию смещения.Во время опроса потребительский поток отправит предыдущее пакетное смещение.

Поскольку в вашем случае AUTO_COMMIT имеет значение false и lastAcknowledgment.acknowledge() не подтверждает, смещение не отправляется.

Только односпособ сделать это, как только вы получите записи опроса, выполните задачу Schedule как Async и удержите поток потребителя и отправьте смещение после завершения асинхронной задачи. Проверьте этот ответ для справки answer

Примечание Если удерживать поток потребителя более 5 минут, произойдет перебалансировка здесь

Теперь новый потребитель Java поддерживает сердцебиение отфоновый поток.Существует новая конфигурация max.poll.interval.ms, которая контролирует максимальное время между вызовами опроса, прежде чем потребитель проактивно покинет группу (по умолчанию 5 минут) .Значение конфигурации request.timeout.ms всегда должно быть больше, чем max.poll.interval.ms, поскольку это максимальное время, которое запрос JoinGroup может блокировать на сервере, пока потребитель перебалансируется, поэтому мы изменили его значение по умолчанию чуть выше 5минут.Наконец, значение по умолчанию session.timeout.ms было уменьшено до 10 секунд, а значение по умолчанию max.poll.records было изменено на 500.

Специальное примечание от пружиныkafka> 2.1.5

Подтверждения, сделанные на сторонних потоках, будут выполняться потоком потребителя непосредственно перед следующим опросом. Спасибо @Gary Russell за эту информацию

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...