Ищите метод поведения весной у кафки потребителя 1.2.х - PullRequest
0 голосов
/ 10 мая 2018

Я не хочу фиксировать смещения для тех сообщений, обработка которых не удалась, и я хочу, чтобы они были повторно доставлены для обработки. Я использую spring-kafka 1.2.x и внедрил ConsumerSeekAware в моем слушателе.

@Component
public class Listener implements ConsumerSeekAware {

    private static Logger logger = LoggerFactory.getLogger(Listener.class);

    private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

    @KafkaListener(topics = "my-topic", containerFactory = "kafkaManualAckListenerContainerFactory")
    public void listen1(ConsumerRecord<String, String> consumerRecord) throws MyCustomException {
        logger.info("received: key - " + consumerRecord.key() + " value - " + consumerRecord.value());

        // Below code is just to show the issue.Not acknowledging so I can get the same msg again.
        boolean should_commit = false;
        if(should_commit) {
            ack.acknowledge();
        }
        else {
            this.seekCallBack.get().seek(consumerRecord.topic(), consumerRecord.partition() , consumerRecord.offset());
        }
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        logger.info("registerSeekCallback called..");
        this.seekCallBack.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
     logger.info("onPartitionsAssigned called..");
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        logger.info("onIdleContainer called..");
    }
}
######### Contaianer config (auto.commit имеет значение false для потребителя)
factory.getContainerProperties().setAckOnError(false);
          factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

Проблема, с которой я сталкиваюсь, заключается в том, что если у меня есть 10 сообщений в разных разделах для какой-либо темы, поэтому я получаю их все по одному, а после получения всех сообщений я получаю последнее сообщение для любого раздела. Я также попробовал SeekToCurrentErrorHandler , который реализован в версии 2.0.x и который отлично работает. но я не могу обновить свою версию kafka. Если я перезапускаю контейнер, я снова получаю все сообщения, и это нормально, но я не хочу останавливать контейнер, если обработка сообщения не удалась.

Итак, мой вопрос: возможно ли получить такое же (точно такое же, без необходимости остановки контейнера) поведение, аналогичное SeekToCurrentErrorHandler в spring-kafka 1.2.x?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...