Мы серийно потребляем кафку.мы используем X-сообщения, помещаем их в MYSQL, а затем фиксируем их.
Время от времени у нас есть частичные вставки в MYSQL (повторяющиеся записи, другие ошибки и т. д.)
с использованием этого примера из документации.:
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
Мы хотим зафиксировать только успешные записи, пока Кафка воспроизводит сбои.
Но я не могу понять, как это сделать, поскольку API получил только commitSync () для всего пакета.
Идеи?