Как потреблять из Кафки тему в нескольких горутинах, используя Сарама? - PullRequest
2 голосов
/ 04 июля 2019

Я использую https://github.com/Shopify/sarama для взаимодействия с Кафкой. У меня есть тема, например, 100 разделов . У меня есть приложение, которое развернуто на 1 хосте . Итак, я хочу использовать из этой темы в нескольких goroutines.

Я вижу этот пример - https://github.com/Shopify/sarama/blob/master/examples/consumergroup/main.go, в котором мы видим, как создать потребителя в определенной группе потребителей.

Итак, мой вопрос заключается в том, должен ли я создать несколько таких потребителей, или в Sarama есть некоторая настройка, в которой я могу настроить необходимое количество потребительских программ.

P.S. Я вижу этот вопрос - https://github.com/Shopify/sarama/issues/140 - но нет ответа, как создать MultiConsumer.

1 Ответ

2 голосов
/ 04 июля 2019

В этом примере показано полностью работающее консольное приложение, которое может использовать все разделы в теме, создавая одну процедуру на раздел:

https://github.com/Shopify/sarama/blob/master/tools/kafka-console-consumer/kafka-console-consumer.go

Это связано в конце потокаВы отправили в своем вопросе.

В основном это создает один потребитель:

c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), config)

Затем получает все разделы для нужной темы:

func getPartitions(c sarama.Consumer) ([]int32, error) {
    if *partitions == "all" {
        return c.Partitions(*topic)
    }
...

Затем для каждого разделаон создает PartitionConsumer и потребляет из каждого раздела в другой программе:

for _, partition := range partitionList {
    pc, err := c.ConsumePartition(*topic, partition, initialOffset)
    ....

    wg.Add(1)
    go func(pc sarama.PartitionConsumer) {
        defer wg.Done()
        for message := range pc.Messages() {
            messages <- message
        }
    }(pc)

}
...