У меня следующая проблема с API kafka.Я настроил своего потребителя с помощью:
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, configuration.batchSize);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Затем
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
try {
//do some update in DB in a transaction
consumer.commitSync();
} catch (Exception e) {
}
Я хочу прочитать данные из Kafka, обновить базу данных из этих данных.Но если обновление не удается, я хочу повторить попытку, пока оно не заработает.Поэтому я хотел бы применить транзакцию БД к kafka, то есть, если моя транзакция БД в порядке, переместите указатель kafka, но если он потерпит неудачу, повторите попытку с той же позиции.
В моем коде
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(
работает не так, как ожидалось, это означает «если кафка падает, то перезапустите с зафиксированной позиции».Но когда моя транзакция БД терпит неудачу, даже если я не выполняю commitAsync (), указатель перемещается вперед.
Мой вопрос: существует ли простой способ повернуть позицию указателя Кафки в положение последнейpoll.
Я уже заметил, что в API есть
public void seek(TopicPartition partition,
long offset);
, но для этого нужно вручную вести список разделов с их смещением, я полагаю, есть что-то более простое и элегантное?