У меня следующая конфигурация в моем Kafka-Producer
:
kafkaProducer.linger.ms : 0
kafkaProducer.batch.size : 0
kafkaProducer.buffer.memory : 33554432
kafkaProducer.retries : 3
kafkaProducer.request.timeout.ms : 10000
kafkaProducer.key.serializer : org.apache.kafka.common.serialization.StringSerializer
kafkaProducer.max.block.ms : 10000
kafkaProducer.value.serializer : org.apache.kafka.common.serialization.StringSerializer
kafkaProducer.acks : all
Иногда я получаю следующую ошибку при обработке сообщений kafka:
Failed to send; nested exception is
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
topic-name-14: 23655 ms has passed since batch creation plus linger time
Если возникает это исключение, эти данные не не дошел до Kafka topi c, и нет возможности обработать эти данные повторно. Каким должен быть правильный способ сделать это?
В настоящее время я думаю реализовать, как показано ниже:
- Сохранять все неудачные события в MongoDB
- Использовать Пн go Исходный кафка-коннектор, который использует CD C для обработки данных в отказоустойчивом Topi c
- Потребители также начнут слушать этот topi c.
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
// Pushing to Elasticsearch through API
}
@Override
public void onFailure(Throwable ex) {
// Some loggings and persisting to Mongo
}
}
Есть ли лучший способ справиться с этим?