1) Если вы используете верблюжий вариант <2.22, то вы не можете управлять смещением, это происходит в другом потоке со значением по умолчанию 5 секунд, которое можно изменить.Если вы используете верблюжий вариант> = 2.22, то только вы можете контролировать фиксацию сообщений вручную.Чтобы использовать ручную фиксацию, установите следующие свойства:
autoCommitEnable = false: отключить автоматическую фиксацию смещений, чтобы мы могли использовать ручную фиксацию.allowManualCommit = true: включить фиксацию вручную, дает нам доступ к возможности KafkaManualCommit.Ниже приведен фрагмент кода:
KafkaManualCommit manual =
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
if (manual != null) {
LOGGER.info("committing the offset manually");
manual.commitSync();
}
2) В вашем втором вопросе кажется, что вы хотите снова отправить сообщение на kafka для обработки.Но из вашего кода кажется, что вы используете одну и ту же конечную точку для потребителя и производителя.Когда вы хотите создать сообщение в кафке, вам нужно указать «тему», «раздел» и «ключ» сообщения, которого я не вижу в вашем коде.Говоря о дырах в петлях, потому что вы снова помещаете сообщения в kafka, что, если сообщение было повреждено, поэтому вы будете продолжать получать то же исключение и снова помещать то же сообщение n снова обратно в kafka.Я бы предложил повторить сообщение по тому же маршруту.Ниже приведен фрагмент кода:
onException(YourException.class)
.maximumRedeliveries(3) // You can call some method too
.redeliveryDelay(100) // You can call some method too
.onRedelivery(exchange -> {
int retryCount = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
log.debug("Recoverable exception occurred. Retried {} time " , retryCount);
})
.retryAttemptedLogLevel(LoggingLevel.DEBUG)
.to("someOtherRoute // Probably to error-topic