У меня есть приложение весенней загрузки (весенняя версия 2.2.2.RELEASE), где я настроил потребителя Kafka, который обрабатывает данные из Kafka и обслуживает несколько веб-сокетов. Подписка на kafka прошла успешно, но не все сообщения от выбранного Kafka topi c обрабатываются потребителем. Немногие сообщения задерживаются, а некоторые пропускаются. Но производитель отсылает данные, которые совершенно гарантированы. Ниже я поделился свойствами конфигурации, которые я использовал.
@Bean
public ConsumerFactory<String, String> consumerFactory() {
final String BOOTSTRAP_SERVERS = kafkaBootstrapServer;
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
return new DefaultKafkaConsumerFactory<>(props);
}
Есть ли какая-либо конфигурация, которую мне не хватает?