Я не хочу фиксировать смещения для тех сообщений, обработка которых не удалась, и я хочу, чтобы они были повторно доставлены для обработки. Я использую spring-kafka 1.2.x и внедрил ConsumerSeekAware в моем слушателе.
@Component
public class Listener implements ConsumerSeekAware {
private static Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();
@KafkaListener(topics = "my-topic", containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen1(ConsumerRecord<String, String> consumerRecord) throws MyCustomException {
logger.info("received: key - " + consumerRecord.key() + " value - " + consumerRecord.value());
// Below code is just to show the issue.Not acknowledging so I can get the same msg again.
boolean should_commit = false;
if(should_commit) {
ack.acknowledge();
}
else {
this.seekCallBack.get().seek(consumerRecord.topic(), consumerRecord.partition() , consumerRecord.offset());
}
}
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
logger.info("registerSeekCallback called..");
this.seekCallBack.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
logger.info("onPartitionsAssigned called..");
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
logger.info("onIdleContainer called..");
}
}
######### Contaianer config (auto.commit имеет значение false для потребителя)
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
Проблема, с которой я сталкиваюсь, заключается в том, что если у меня есть 10 сообщений в разных разделах для какой-либо темы, поэтому я получаю их все по одному, а после получения всех сообщений я получаю последнее сообщение для любого раздела. Я также попробовал SeekToCurrentErrorHandler , который реализован в версии 2.0.x и который отлично работает. но я не могу обновить свою версию kafka. Если я перезапускаю контейнер, я снова получаю все сообщения, и это нормально, но я не хочу останавливать контейнер, если обработка сообщения не удалась.
Итак, мой вопрос: возможно ли получить такое же (точно такое же, без необходимости остановки контейнера) поведение, аналогичное SeekToCurrentErrorHandler в spring-kafka 1.2.x?