Деформация сарама-кластера, потребляющая действие в процедуре, затем не может ничего потреблять - PullRequest
0 голосов
/ 16 апреля 2020

Я использую sarama-cluster lib для создания потребителя группы kafka в бэкэнд-сервисе. Этот пример кода из godo c работает:

for {
    if msg, ok := <-consumer.Messages(); ok {
        fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
        consumer.MarkOffset(msg, "") // mark message as processed
    }
}

, так как это мертвый l oop, я положил его в программу, чтобы избежать блокировки других действий, тогда он не может больше не использовать любое сообщение:

go func() {
    for msg := range consumer.Messages() {
        fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
        consumer.MarkOffset(msg, "") // mark message as processed
    }
}()

(служба работает, поэтому эта процедура не прекращается. Она просто не в состоянии потреблять)

Есть идеи, как решить эту проблему?

1 Ответ

0 голосов
/ 16 апреля 2020

sarama-cluster не поддерживается уже довольно давно со следующим уведомлением :

Обратите внимание, что с момента объединения https://github.com/Shopify/sarama/pull/1099 и release (> = v1.19.0) эта библиотека официально устарела. Собственная реализация поддерживает множество вариантов использования, которые недоступны в этой библиотеке.

Я предлагаю вам использовать github.com / Shopify / sarama . Он имеет все функции sarama-cluster и активно поддерживается.

Вы можете следовать простому примеру группы потребителей из их хранилища .

...