Конфигурация повтора из упомянутой фабрики делегируется в RetryingMessageListenerAdapter
, логика которого выглядит следующим образом:
public void onMessage(final ConsumerRecord<K, V> record, final Acknowledgment acknowledgment,
final Consumer<?, ?> consumer) {
RetryState retryState = null;
if (this.stateful) {
retryState = new DefaultRetryState(record.topic() + "-" + record.partition() + "-" + record.offset());
}
getRetryTemplate().execute(context -> {
context.setAttribute(CONTEXT_RECORD, record);
switch (RetryingMessageListenerAdapter.this.delegateType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
context.setAttribute(CONTEXT_CONSUMER, consumer);
RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment, consumer);
break;
case ACKNOWLEDGING:
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment);
break;
case CONSUMER_AWARE:
context.setAttribute(CONTEXT_CONSUMER, consumer);
RetryingMessageListenerAdapter.this.delegate.onMessage(record, consumer);
break;
case SIMPLE:
RetryingMessageListenerAdapter.this.delegate.onMessage(record);
}
return null;
},
getRecoveryCallback(), retryState);
}
Итак, мы повторим попытку для каждого сообщения. В соответствии с рекомендациями Apache Kafka мы обрабатываем один раздел в одном потоке, поэтому каждая следующая запись в этом разделе не будет обрабатываться до тех пор, пока не будет исчерпана повторная попытка или вызов не будет успешным.
В зависимости от состояния нескольких разделов и конфигурации factory.setConcurrency(KafkaProperties.CONCURRENCY);
, это может быть тот факт, что разные разделы обрабатываются в разных потоках. Поэтому может случиться так, что разные записи из разных разделов будут повторены одновременно. Просто потому, что повторная попытка связана с потоком и стеком вызовов.