Потребляйте несколько тем в одном слушателе в весенней загрузке кафки - PullRequest
0 голосов
/ 18 сентября 2018

Может ли кто-нибудь представить мне небольшой пример весенней загрузки kafka, где мы можем использовать несколько тем в одном классе слушателей.

Ответы [ 2 ]

0 голосов
/ 23 сентября 2018

Для группы потребителей вы можете использовать следующее:

@ KafkaListener (themes = "topic1,") public void listen (запись @Payload KafkaBinding, заголовки MessageHeaders) выбрасывает ExecutionException, InterruptedException { ......... ......... .. }

Для потребителей, действующих в качестве правопреемника, вы можете использовать следующее:

@ KafkaListener (id = «foo», topicPartitions = {@TopicPartition (topic = «myTopic», partitions = {«1»})}) public void listen (запись @Payload KafkaBinding, заголовки MessageHeaders) выбрасывает ExecutionException, InterruptedException { ......... ......... .. }

0 голосов
/ 19 сентября 2018

application.yml

my:
    kafka:
        conf:
            groupId: myId
            topics: topic1,topicN

слушатель:

@KafkaListener(groupId = "${my.kafka.conf.groupId}", topics = "#{'${my.kafka.conf.topics}'.split(',')}")
public void storeTopicsDataToMongo(
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(required = false, name= KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
        @Payload(required = false) String record)
{
    log.trace(format("Received topic[%s] key[%s] payload[%s]", topic, key, record));
    //your code
}

или вы можете исследовать @ConfigurationProperties и создать бины самостоятельно, что-то вроде:

@Component
@ConfigurationProperties(prefix = "my.kafka.conf")
@Data //=> lombok
public class ConsumerConfigurationProperties {

    private String groupId;
    private List<String> topics;
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...