Как получить доступ к MessageHeaders в Kafka Batch Listener - PullRequest
0 голосов
/ 08 января 2019

У меня есть следующий класс автоконфигурации для Кафки:

@Configuration
@EnableKafka
@ConditionalOnClass(KafkaReceiver.class)
@ConditionalOnProperty({"spring.kafka.bootstrap-servers"})
public class KafkaAutoConfiguration<T> { @Value("${spring.kafka.bootstrap-servers}")

    private String bootstrapServers;

    private KafkaProperties kafkaConfig;
    private String groupId;

    @Autowired
    public void setKafkaProperties(KafkaProperties properties) {
        this.kafkaConfig = properties;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            org.apache.kafka.common.serialization.ByteArrayDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaBatchListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);

        return factory;
    }

    @Bean
    public DefaultKafkaHeaderMapper headerMapper(){
        return new DefaultKafkaHeaderMapper();
    }

    @Bean("simpleReceiver")
    @ConditionalOnMissingBean(name = "simpleReceiver")
    @ConditionalOnProperty({"service.kafka.consumer.topics"})
    public KafkaReceiver simpleReceiver() {
        return new KafkaSimpleReceiver();
    }

    @Bean("batchReceiver")
    @ConditionalOnMissingBean(name = "batchReceiver")
    @ConditionalOnProperty({"service.kafka.consumer.batch-topics"})
    public KafkaReceiver batchReceiver() {
        return new KafkaBatchReceiver();
    }
}

Простой компонент-слушатель:

public class KafkaSimpleReceiver implements KafkaReceiver {
    @KafkaListener(topics = "#{'${service.kafka.consumer.topics}'.split(',')}", containerFactory = "kafkaListenerContainerFactory")
    public void receive(ConsumerRecord record, @Headers MessageHeaders headers) throws KafkaException {
    }
}

Бин пакетного слушателя:

public class KafkaBatchReceiver implements KafkaReceiver {

    @KafkaListener(topics = "#{'${service.kafka.consumer.batch-topics}'.split(',')}", containerFactory = "kafkaBatchListenerContainerFactory")
    public void receive(List<ConsumerRecord> records, @Headers MessageHeaders headers) throws KafkaException {

    }
}

Простой слушатель работает нормально, но я получаю следующую ошибку для пакетного слушателя. Как мы можем получить доступ к MessageHeaders в этом случае?

A parameter of type 'List<ConsumerRecord>' must be the only parameter (except for an optional 'Acknowledgment' and/or 'Consumer')  

EDIT

Это то, что я сделал, чтобы преобразовать ConsumerRecord в MessageHeaders

public void receive(List<ConsumerRecord> records) { 
    for(ConsumerRecord record : records) { 
        Map<String, Object> headersList = new HashMap<>(); 
        for(final Header h : record.headers()) { 
            headersList.put(h.key(), new String(h.value())); 
        } 
    MessageHeaders headers = new MessageHeaders(headersList); 
    } 
}

1 Ответ

0 голосов
/ 08 января 2019

Вы можете получить список Message<?>.

@KafkaListener(topics = "so54086076", id = "so54086076")
public void listen(List<Message<?>> records) {
    System.out.println(records.size() + ":" + records);
}

Полезные сообщения сообщения будут ConsumerRecord.value(); другие свойства ConsumerRecord будут в заголовках.

...