Как иметь несколько групп потребителей kafka в свойствах приложения - PullRequest
1 голос
/ 08 мая 2019

Я использую spring kafka и хотел бы, чтобы в yaml приложения были указаны несколько групповых идентификаторов потребителей, которые я могу использовать в своем классе kafkaListener, где я слушаю несколько тем.

Мои свойства kafka в application.ymlНа данный момент файл выглядит следующим образом:

kafka:
    properties:
      topics:
        topic1: topic1
        topic2: topic2
    bootstrap-servers: server1,server2
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      retries: 4
    consumer:
      group-id: mygroupid
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Есть ли способ иметь несколько идентификаторов groupID в блоке потребителя выше.

В качестве альтернативы, я дал различные идентификаторы groupID в kafkaListener в моем весеннем коде следующим образомЯ не уверен, как настроить остальные свойства, такие как автоматический смещение-сброс, ключ-десериализатор и т. Д .:

 @KafkaListener(topics = "topic1",  groupId = "cd1")
  public void consumeMessage(String message) throws Exception {
    // some code goes here
  }

Пожалуйста, дайте мне знать, как выполнить то, что я пытаюсь сделать, как яЯ новичок в Кафке.

1 Ответ

0 голосов
/ 08 мая 2019

Загрузка только автоматически настраивает одну DefaultKafkaConsumerFactory и DefaultKafkaConsumerFactory из yml, поэтому все свойства являются общими для всех потребителей. Вот почему мы добавили groupId (и используем id, если groupId не указано); это наиболее распространенное свойство, которое меняется между потребителями.

Конечно, вы можете использовать свойства-заполнители, поэтому groupId = "${group.one}" будет использовать свойство group.one из yml.

Чтобы изменить более фундаментальные вещи, такие как сериализаторы / десериализаторы, если вы используете версию более раннюю, чем 2.2.4, вам нужно будет создать несколько фабрик и контейнерных фабрик.

Однако, начиная с версии 2.2.4, теперь вы можете установить любое произвольное свойство потребителя kafka в аннотации KafkaListener ...

/**
 * Kafka consumer properties; they will supersede any properties with the same name
 * defined in the consumer factory (if the consumer factory supports property overrides).
 * <h3>Supported Syntax</h3>
 * <p>The supported syntax for key-value pairs is the same as the
 * syntax defined for entries in a Java
 * {@linkplain java.util.Properties#load(java.io.Reader) properties file}:
 * <ul>
 * <li>{@code key=value}</li>
 * <li>{@code key:value}</li>
 * <li>{@code key value}</li>
 * </ul>
 * {@code group.id} and {@code client.id} are ignored.
 * @return the properties.
 * @since 2.2.4
 * @see org.apache.kafka.clients.consumer.ConsumerConfig
 * @see #groupId()
 * @see #clientIdPrefix()
 */
String[] properties() default {};

Обратите внимание, что эти свойства представлены в собственном формате с точками Кафки (auto.offset.reset), а не в свойствах падежа при загрузке или верблюда.

Вот пример из документации :

@KafkaListener(topics = "myTopic", groupId="group", properties= {
    "max.poll.interval.ms:60000",
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})

Опять же, значения могут быть заполнителями свойств.

На стороне производителя вам все еще нужно несколько заводов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...