Как я могу эффективно привязать свой @KafkaListener к ConcurrentKafkaListenerContainerFactory? - PullRequest
0 голосов
/ 29 мая 2020

Я попал в этот сценарий, который мне кажется странным:

Итак, в основном я определил два @KafkaListener в одном классе:

@KafkaListener(id = "listener1", idIsGroup = false, topics = "data1", containerFactory = "kafkaListenerContainerFactory")
    public void receive(){}

@KafkaListener(id = "listener2", idIsGroup = false, topics = "data2", containerFactory = "kafkaListenerContainerFactory2")
    public void receive(){}

Их id, topics, containerFactory разные, и каждый из них полагается на свой ConcurrentKafkaListenerContainerFactory, как определено в другом классе:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory("group1", "earliest"));
    factory.setAutoStartup(false);
    return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory2() {
    ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory("group2", "latest"));
    factory.setAutoStartup(true);
    return factory;
}

@Bean
public ConsumerFactory<String, ConsumerRecord> consumerFactory(String groupId, String offset) {
    Map<String, Object> config = new HashMap<>();
    // dt is current timestamp in millisecond (epoch)
    config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + "-" + dt);
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
    // other config omitted
    return new DefaultKafkaConsumerFactory<>(config);
}

Итак, что я ожидаю увидеть (и чего я хочу достичь):

  1. Только listener2 будет запускаться автоматически, потому что factory.setAutoStartup(true)
  2. Listener2 начнется с group.id «group2» и auto.offset.reset «latest»
  3. Позже, когда будет запущен listener1 через некоторый прослушиватель событий он будет начинаться с group.id "group1" и auto.offset.reset "earlist"

Однако фактически гарантирован только 1-й. Listener2 может начинаться с {group2 + latest} или {group1 + early}. И позже, когда listener1 начинает потреблять данные, он просто повторно использует конфигурацию listener2 (я вижу, что тот же идентификатор группы, который содержит временную метку, напечатан дважды в моем журнале)

Мой вопрос: почему идентификатор группы и конфигурация смещения для listener2 выбираются случайным образом, а autoStartup - нет? И почему listener1 будет повторно использовать конфигурацию listener2?

1 Ответ

1 голос
/ 29 мая 2020

Это потому, что consumerFactory является одноэлементным @Bean, и аргументы игнорируются при втором вызове.

Добавление @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) в фабрику каждый раз получает новый bean-компонент.

Однако вам это не нужно, вы можете просто установить свойство groupId в аннотациях и избежать всего этого дополнительного определения.

Вы также можете контролировать autoStartup в аннотации (начиная с версии 2.2) .

EDIT

Чтобы ответить на вопрос в комментарии ниже ...

groupId = "#{'${group.id}' + T(java.time.Instant).now().toEpochMilli()}"

однако, если вам нужен уникальный идентификатор группы; это надежнее ...

groupId = "#{'${group.id}' + T(java.util.UUID).randomUUID()}"
...