Загрузка только автоматически настраивает одну DefaultKafkaConsumerFactory
и DefaultKafkaConsumerFactory
из yml, поэтому все свойства являются общими для всех потребителей. Вот почему мы добавили groupId
(и используем id
, если groupId
не указано); это наиболее распространенное свойство, которое меняется между потребителями.
Конечно, вы можете использовать свойства-заполнители, поэтому groupId = "${group.one}"
будет использовать свойство group.one
из yml.
Чтобы изменить более фундаментальные вещи, такие как сериализаторы / десериализаторы, если вы используете версию более раннюю, чем 2.2.4, вам нужно будет создать несколько фабрик и контейнерных фабрик.
Однако, начиная с версии 2.2.4, теперь вы можете установить любое произвольное свойство потребителя kafka в аннотации KafkaListener
...
/**
* Kafka consumer properties; they will supersede any properties with the same name
* defined in the consumer factory (if the consumer factory supports property overrides).
* <h3>Supported Syntax</h3>
* <p>The supported syntax for key-value pairs is the same as the
* syntax defined for entries in a Java
* {@linkplain java.util.Properties#load(java.io.Reader) properties file}:
* <ul>
* <li>{@code key=value}</li>
* <li>{@code key:value}</li>
* <li>{@code key value}</li>
* </ul>
* {@code group.id} and {@code client.id} are ignored.
* @return the properties.
* @since 2.2.4
* @see org.apache.kafka.clients.consumer.ConsumerConfig
* @see #groupId()
* @see #clientIdPrefix()
*/
String[] properties() default {};
Обратите внимание, что эти свойства представлены в собственном формате с точками Кафки (auto.offset.reset
), а не в свойствах падежа при загрузке или верблюда.
Вот пример из документации :
@KafkaListener(topics = "myTopic", groupId="group", properties= {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
Опять же, значения могут быть заполнителями свойств.
На стороне производителя вам все еще нужно несколько заводов.