Как подписаться на несколько тем с помощью аннотации @KafkaListner - PullRequest
1 голос
/ 03 июля 2019

Я использую брокер сообщений Kafka для публикации и подписки на событие.Для этого используется весенняя инфраструктура.Мое требование - мне нужно создать одного потребителя, который будет подписываться на несколько тем.

Ниже приведен код, который отлично работает при подписке на одну тему.

@KafkaListener(topics = "com.customer.nike")
  public void receive(String payload) {
    LOGGER.info("received payload='{}'", payload);
  }

Но я хочу, чтобы он был подписан на какой-то шаблон темы.как ..

   @KafkaListener(topics = "com.cusotmer.*.nike")
      public void receive(String payload) {
        LOGGER.info("received payload='{}'", payload);
      }

В этом коде * будет меняться.Это может быть некоторое числовое значение, например 1000. 1010 и т. Д.Для этого я также использовал SpeL.

   @KafkaListener(topics = "#{com.cusotmer.*.nike}")
      public void receive(String payload) {
        LOGGER.info("received payload='{}'", payload);
      }

Но этот также не работает для меня.Может ли кто-нибудь помочь мне подписаться на несколько тем.

Заранее спасибо.

Ответы [ 2 ]

1 голос
/ 04 июля 2019

Что касается подписки на несколько тем , вы можете использовать topicPatterns для достижения этого:

Шаблон темы для этого слушателя.Записи могут быть «шаблон темы», «ключ-заполнитель свойства» или «выражение».Фреймворк создаст контейнер, который подписывается на все темы, соответствующие указанному шаблону, для получения динамически назначаемых разделов.Сопоставление с образцом будет выполняться периодически по темам, существующим на момент проверки.Выражение должно быть преобразовано в шаблон темы (поддерживаются типы результатов String или Pattern).

Взаимоисключающие с topic () и topicPartitions ().

@KafkaListener(topicPattern = "com.customer.*")
  public void receive(String payload) {
    LOGGER.info("received payload='{}'", payload);
  }

Что касается программного доступа к имени темы , вы можете использовать @Headerаннотированный метод для извлечения определенного значения заголовка, определенного KafkaHeaders , который в вашем случае равен RECEIVED_TOPIC :

Заголовок, содержащий тему, из которой сообщениебыл получен.

@KafkaListener(topics = "com.customer.nike")
    public void receive(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    LOGGER.info("received payload='{}'", payload);
    LOG.info("received from topic: {}", topic);
    }
0 голосов
/ 04 июля 2019

Я использую @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}", где kafka.topics берется из моего файла свойств и содержит разделенные запятыми темы, которые мой слушатель должен слушать.

Но может быть, во время запуска вы можете добавить логику для генерации всех возможных тем и присвоения переменной, которая впоследствии может быть использована, как указано выше.

Обновление: подстановочный знак возможен, как Александр прокомментировал ниже.

...