Как прочитать значения заголовка в сценарии обработки ошибок Batch Listener - PullRequest
0 голосов
/ 10 февраля 2020

Я пытаюсь обработать исключение на слушателе

 @KafkaListener(id = PropertiesUtil.ID,
            topics = "#{'${kafka.consumer.topic}'}",
            groupId = "${kafka.consumer.group.id.config}",
            containerFactory = "containerFactory",
            errorHandler = "errorHandler")
    public void receiveEvents(@Payload List<ConsumerRecord<String, String>> recordList,
                              Acknowledgment acknowledgment) {
        try {
            log.info("Consuming the batch of size {} from kafka topic {}", consumerRecordList.size(),
                    consumerRecordList.get(0).topic());
            processEvent(consumerRecordList);
            incrementOffset(acknowledgment);
        } catch (Exception exception) {
            throwOrHandleExceptions(exception, recordList, acknowledgment);
            .........
        }
    }

Конфигурация контейнера Kafka:

   @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    containerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(this.numberOfConsumers);
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setConsumerFactory(getConsumerFactory());
        factory.setBatchListener(true);
        return factory;
    }
}

обработчик ошибок слушателя impl

@Bean
public ConsumerAwareListenerErrorHandler errorHandler() {
    return (m, e, c) -> {
        MessageHeaders headers = m.getHeaders();
        List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class); 
        List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
        List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
        Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
        for (int i = 0; i < topics.size(); i++) {
            int index = i;
            offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
                    (k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index)));
        }
       ...
    };
}

когда я пытаюсь запустить то же самое без пакетной обработки, то я могу получить раздел, значения topi c и смещения, но когда я включаю пакетную обработку и пытаюсь проверить ее, тогда я получаю только два значения внутри заголовков, т.е. id и отметка времени и другие значения не установлены. Я что-то здесь упускаю ??

1 Ответ

0 голосов
/ 10 февраля 2020

Какую версию вы используете? Я только что протестировал его с Boot 2.2.4 (SK 2.3.5), и он отлично работает ...

@SpringBootApplication
public class So60152179Application {

    public static void main(String[] args) {
        SpringApplication.run(So60152179Application.class, args);
    }


    @KafkaListener(id = "so60152179", topics = "so60152179", errorHandler = "eh")
    public void listen(List<String> in) {
        throw new RuntimeException("foo");
    }

    @Bean
    public ConsumerAwareListenerErrorHandler eh() {
        return (m, e, c) -> {
            System.out.println(m);
            return null;
        };
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so60152179", "foo");
        };
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so60152179").partitions(1).replicas(1).build();
    }

}
spring.kafka.listener.type=batch
spring.kafka.consumer.auto-offset-reset=earliest

GenericMessage [payload = [foo], headers = {kafka_offset = ] kafka_receivedPartitionId = [0], kafka_receivedTopic = [so60152179], kafka_receivedTimestamp = [1581351585253], kafka_groupId = so60152179}]

...