Spring Kafka: подписка на новый шаблон темы во время выполнения - PullRequest
0 голосов
/ 10 июня 2019

Я использую аннотацию @KafkaListener для использования тем в моем приложении.Мне нужно изменить шаблон темы во время выполнения у уже работающего потребителя, чтобы можно было использовать новые темы, соответствующие новому шаблону.

Я пробовал приведенный ниже код, но он по-прежнему использует темы, соответствующие старому шаблону темы,Здесь я установил «старый шаблон темы» при запуске приложения.Затем я обновляю шаблон на «шаблон новой темы» каждые 10 секунд, используя Spring @ Scheduler.

Class "KafkaTopicPatternConfig.java":

@Configuration
public class KafkaTopicPatternConfig {

  @Bean
  public String kafkaTopicPattern(Environment env) {
    logger.info("Getting kafka topic pattern");
    String kafkaTopicPattern = "old-topic-pattern";
    return kafkaTopicPattern;
  }
}



Class "Consumer.java":

@Component
public class Consumer implements ConsumerSeekAware{

  @Autowired
  @Qualifier("kafkaTopicPattern")
  private String kafkaTopicPattern;


  @KafkaListener(topicPattern = "#{kafkaTopicPattern}", id = "s4federatorConsumer")
  public void processMessage(@Payload ConsumerRecord<String, Object> record,
        @Header(KafkaHeaders.OFFSET) Long offset,
        @Header(KafkaHeaders.CONSUMER) KafkaConsumer<String, String> consumer,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partitionId) {

        //do something with the consumed message

  }


  @Scheduled(fixedDelay = 10000, initialDelay = 15000)
  public void refreshKafkaTopics() {
    logger.info("Inside scheduler to refresh kafka topics");
    this.kafkaTopicPattern = "new-topic-pattern";
    this.kafkaListenerEndpointRegistry.getListenerContainer("s4federatorConsumer").stop();
    this.kafkaListenerEndpointRegistry.getListenerContainer("s4federatorConsumer").start();
  }
}

1 Ответ

0 голосов
/ 10 июня 2019

Вы получаете kafkaTopicPattern как -

@Qualifier("kafkaTopicPattern")
private String kafkaTopicPattern;

Я вижу, вы обновляете шаблон как -

this.kafkaTopicPattern = "new-topic-pattern";

Но исходное значение для "kafkaTopicPattern", которое вводится в слушатель, не будет обновлено этим, если эти 2 находятся в разных объектах экземпляра. Поэтому вам необходимо убедиться, что объекты слушателя обновлены с использованием нового шаблона.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...