Как Partial commitSync при потреблении пакета в кафке - PullRequest
0 голосов
/ 26 ноября 2018

Мы серийно потребляем кафку.мы используем X-сообщения, помещаем их в MYSQL, а затем фиксируем их.

Время от времени у нас есть частичные вставки в MYSQL (повторяющиеся записи, другие ошибки и т. д.)

с использованием этого примера из документации.:

List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
        }
        if (buffer.size() >= minBatchSize) {
            insertIntoDb(buffer);
            consumer.commitSync();
            buffer.clear();
        }

Мы хотим зафиксировать только успешные записи, пока Кафка воспроизводит сбои.

Но я не могу понять, как это сделать, поскольку API получил только commitSync () для всего пакета.

Идеи?

1 Ответ

0 голосов
/ 26 ноября 2018

В Kafka вы не фиксируете определенные записи, т.е. вы не можете пометить смещение N как обработанное и смещение N-1 как не обработанное.Вместо этого, фиксируя смещение N, вы указываете, что обработали все записи до N.

Что вы можете сделать, если не сработаете смещение N:

  • Подтвердить N-1 (используя commitSync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets)) и повторите обработку смещения N, поскольку оно все еще находится в памяти.Только после успешного прохождения N вы фиксируете N и переходите к новым записям.

  • Если вы запустили Sink Connector в Kafka Connect, после сбоя в обработке N вы можете переслать запись вConnect's Deal Letter Queue.В противном случае верните его в другую тему для дальнейшей обработки.Это временно пропускает смещение N (вы также можете сбросить его, если это возможно).

Вы также можете смешать оба варианта, сделать несколько повторных попыток, но если невозможно обработатьсохраните / удалите эту запись и продолжайте обрабатывать новые записи.

...