Я использую sarama-cluster
lib для создания потребителя группы kafka в бэкэнд-сервисе. Этот пример кода из godo c работает:
for {
if msg, ok := <-consumer.Messages(); ok {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
}
, так как это мертвый l oop, я положил его в программу, чтобы избежать блокировки других действий, тогда он не может больше не использовать любое сообщение:
go func() {
for msg := range consumer.Messages() {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
}()
(служба работает, поэтому эта процедура не прекращается. Она просто не в состоянии потреблять)
Есть идеи, как решить эту проблему?