Кафкализистер читает сообщения дважды - PullRequest
1 голос
/ 31 марта 2019

Таким образом, с приведенной ниже конфигурацией, когда мы масштабируем загрузочные контейнеры пружины до 10 jvms, количество событий случайным образом больше, чем опубликовано, например, если опубликовано 320000 сообщений, иногда события 320500 и т. Д.

//Consumer container bean 
    private static final int CONCURRENCY = 1;


@Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "topic1");
        props.put("enable.auto.commit", "false");

        //props.put("isolation.level", "read_committed");
        return props;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
        factory.getContainerProperties().setPollTimeout(3000);
        factory.setConcurrency(CONCURRENCY);
        return factory;
    }

//Listener 
    @KafkaListener(id="claimserror",topics = "${kafka.topic.dataintakeclaimsdqerrors}",groupId = "topic1", containerFactory = "kafkaListenerContainerFactory")
    public void receiveClaimErrors(String event,Acknowledgment ack) throws JsonProcessingException {
//save event to table ..
}

Обновлен Представленное ниже изменение теперь работает нормально, я просто добавлю повторную проверку в потребителе, чтобы предотвратить сценарий сбоя потребителя

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "topic1");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "-1");
    //props.put("isolation.level", "read_committed");
    return props;
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...