Kafka не может обработать все сообщения - Java Spring Boot - PullRequest
1 голос
/ 09 января 2020

У меня есть приложение весенней загрузки (весенняя версия 2.2.2.RELEASE), где я настроил потребителя Kafka, который обрабатывает данные из Kafka и обслуживает несколько веб-сокетов. Подписка на kafka прошла успешно, но не все сообщения от выбранного Kafka topi c обрабатываются потребителем. Немногие сообщения задерживаются, а некоторые пропускаются. Но производитель отсылает данные, которые совершенно гарантированы. Ниже я поделился свойствами конфигурации, которые я использовал.

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    final String BOOTSTRAP_SERVERS = kafkaBootstrapServer;
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
    return new DefaultKafkaConsumerFactory<>(props);
}

Есть ли какая-либо конфигурация, которую мне не хватает?

1 Ответ

2 голосов
/ 09 января 2020

Для нового потребителя (никогда не фиксируются смещения для group.id) вы должны установить AUTO_OFFSET_RESET на earliest, чтобы не пропустить ни одной из существующих записей в топи c (по умолчанию latest).

...