У меня тут очень странная проблема.Я запустил потребителя Kafka, использующего библиотеку sarama-cluster, чтобы использовать некоторые сообщения из темы kafka.Но сообщения не принимаются потребителем при запуске.
Однако происходит очень странная вещь.Если я начну параллельно с другим потребителем, сообщения внезапно доставляются обоим потребителям.
Я не могу придумать логического объяснения этому.Будем благодарны за любые указатели.
Примечание. Эта проблема возникла после того, как серверы Kafka и Zookeeper были запущены не изящно.
Ниже приведен код потребителя go для использования сообщений, которые не работают.:
if err := consumer.Start(); err != nil {
return err
}
updChan, err := consumer.Consume()
if err != nil {
return err
}
go func() {
for {
select {
case msg, ok := <-updChan:
if !ok {
return
}
var message liveupdater.KafkaMessage
err := json.Unmarshal(msg.Msg, &message)
if err != nil {
fmt.Println(err)
}
err = handleMessaege(message)
if err != nil {
logrus.Println("encountered error:" + err.Error())
}
consumer.MarkProcessed(msg, string(message.Type))
}
}
}()
Ниже приведен код go, где потребитель получает сообщения (единственное отличие между этим и предыдущим кодом - это другой потребитель, работающий параллельно по той же теме).
consumeMessages(config)
if err := consumer.Start(); err != nil {
return err
}
updChan, err := consumer.Consume()
if err != nil {
return err
}
go func() {
for {
select {
case msg, ok := <-updChan:
if !ok {
return
}
var message liveupdater.KafkaMessage
err := json.Unmarshal(msg.Msg, &message)
if err != nil {
fmt.Println(err)
}
err = handleMessaege(message)
if err != nil {
logrus.Println("encountered error:" + err.Error())
}
consumer.MarkProcessed(msg, string(message.Type))
}
}
}()
func consumeMessages(config *rakshak_config.Config) {
kafkaConfig := kafka.Config{Brokers: strings.Split(config.Kafka.Brokers, ",")}
logrus.Println("brokers %s", config.Kafka.Brokers)
hermesConsumer, err := hermes.NewConsumer(hermes.Kafka, []string{config.Kafka.Topic}, kafkaConfig)
if err != nil {
logrus.Println("could not get consumer through hermes %s", err)
}
err = hermesConsumer.Start()
if err != nil {
logrus.Println("could not start consumer through hermes %s", err)
}
conChan, err := hermesConsumer.Consume()
if err != nil {
logrus.Println("not able to start consumer channel %s", err)
}
go func() {
for {
select {
case msg, ok := <-conChan:
if !ok {
logrus.Println("could not consume message")
}
logrus.Println("kafka msg string: %s", string(msg.Msg[:]))
hermesConsumer.MarkProcessed(msg, "")
}
}
}()
Спасибозаранее.