Кафка с Java: как перечитать данные - PullRequest
0 голосов
/ 20 декабря 2018

У меня следующая проблема с API kafka.Я настроил своего потребителя с помощью:

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, configuration.batchSize);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

Затем

while(true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
    try {
         //do some update in DB in a transaction
         consumer.commitSync();
    } catch (Exception e) {
    }

Я хочу прочитать данные из Kafka, обновить базу данных из этих данных.Но если обновление не удается, я хочу повторить попытку, пока оно не заработает.Поэтому я хотел бы применить транзакцию БД к kafka, то есть, если моя транзакция БД в порядке, переместите указатель kafka, но если он потерпит неудачу, повторите попытку с той же позиции.

В моем коде

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(

работает не так, как ожидалось, это означает «если кафка падает, то перезапустите с зафиксированной позиции».Но когда моя транзакция БД терпит неудачу, даже если я не выполняю commitAsync (), указатель перемещается вперед.

Мой вопрос: существует ли простой способ повернуть позицию указателя Кафки в положение последнейpoll.

Я уже заметил, что в API есть

public void seek(TopicPartition partition,
             long offset);

, но для этого нужно вручную вести список разделов с их смещением, я полагаю, есть что-то более простое и элегантное?

1 Ответ

0 голосов
/ 21 декабря 2018

1) Поскольку consumer.poll находится внутри цикла, вы будете продолжать двигаться вперед со смещениями независимо от того, зафиксировали ли вы смещения или нет.Фиксация пригодится только при перезапуске компонента.Т.е. знать положение, откуда потребитель должен начать потреблять.

2) Если вам необходимо перейти к ранее зафиксированному смещению при сбое транзакции с БД, используйте метод поиска в Kafka Consumer.public void seek (раздел TopicPartition, длинное смещение)

3) Для фиксации смещений для отдельных разделов вам необходимо сохранить смещение для каждого раздела в соответствии с тем, что вы упомянули.Я не думаю, что есть другой способ.

Вам может не потребоваться искать ранее зафиксированное смещение при каждом сбое транзакции БД.Возможно, вы захотите приостановить работу своего потребителя и повторить попытку несколько раз, увеличив время ожидания в геометрической прогрессии.

Но чтобы ответить на ваш вопрос о том, как переходить к предыдущему смещению при каждом опросе, проследите за смещением первого сообщения в каждом разделе и, в случае сбоя, в конце цикла,искать смещение, которое у вас есть трек.

...