Как повторно обработать пакет в Kafka, если произошел сбой с исключением тайм-аута - PullRequest
0 голосов
/ 26 мая 2020

У меня следующая конфигурация в моем Kafka-Producer:

kafkaProducer.linger.ms : 0
kafkaProducer.batch.size : 0
kafkaProducer.buffer.memory : 33554432
kafkaProducer.retries : 3
kafkaProducer.request.timeout.ms : 10000
kafkaProducer.key.serializer : org.apache.kafka.common.serialization.StringSerializer
kafkaProducer.max.block.ms : 10000
kafkaProducer.value.serializer : org.apache.kafka.common.serialization.StringSerializer
kafkaProducer.acks : all

Иногда я получаю следующую ошибку при обработке сообщений kafka:

Failed to send; nested exception is 
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
topic-name-14: 23655 ms has passed since batch creation plus linger time 

Если возникает это исключение, эти данные не не дошел до Kafka topi c, и нет возможности обработать эти данные повторно. Каким должен быть правильный способ сделать это?

В настоящее время я думаю реализовать, как показано ниже:

  1. Сохранять все неудачные события в MongoDB
  2. Использовать Пн go Исходный кафка-коннектор, который использует CD C для обработки данных в отказоустойчивом Topi c
  3. Потребители также начнут слушать этот topi c.
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                // Pushing to Elasticsearch through API
            }

            @Override
            public void onFailure(Throwable ex) {
                // Some loggings and persisting to Mongo
            }
}

Есть ли лучший способ справиться с этим?

...