Я попал в этот сценарий, который мне кажется странным:
Итак, в основном я определил два @KafkaListener
в одном классе:
@KafkaListener(id = "listener1", idIsGroup = false, topics = "data1", containerFactory = "kafkaListenerContainerFactory")
public void receive(){}
@KafkaListener(id = "listener2", idIsGroup = false, topics = "data2", containerFactory = "kafkaListenerContainerFactory2")
public void receive(){}
Их id
, topics
, containerFactory
разные, и каждый из них полагается на свой ConcurrentKafkaListenerContainerFactory
, как определено в другом классе:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory("group1", "earliest"));
factory.setAutoStartup(false);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory2() {
ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory("group2", "latest"));
factory.setAutoStartup(true);
return factory;
}
@Bean
public ConsumerFactory<String, ConsumerRecord> consumerFactory(String groupId, String offset) {
Map<String, Object> config = new HashMap<>();
// dt is current timestamp in millisecond (epoch)
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + "-" + dt);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
// other config omitted
return new DefaultKafkaConsumerFactory<>(config);
}
Итак, что я ожидаю увидеть (и чего я хочу достичь):
- Только listener2 будет запускаться автоматически, потому что
factory.setAutoStartup(true)
- Listener2 начнется с
group.id
«group2» и auto.offset.reset
«latest» - Позже, когда будет запущен listener1 через некоторый прослушиватель событий он будет начинаться с
group.id
"group1" и auto.offset.reset
"earlist"
Однако фактически гарантирован только 1-й. Listener2 может начинаться с {group2 + latest} или {group1 + early}. И позже, когда listener1 начинает потреблять данные, он просто повторно использует конфигурацию listener2 (я вижу, что тот же идентификатор группы, который содержит временную метку, напечатан дважды в моем журнале)
Мой вопрос: почему идентификатор группы и конфигурация смещения для listener2 выбираются случайным образом, а autoStartup - нет? И почему listener1 будет повторно использовать конфигурацию listener2?