Сарама не может создать сообщение для Amazon MSK версии 2.3.1 - PullRequest
0 голосов
/ 18 февраля 2020

Я использовал sarama golang библиотека для отправки сообщений на Amazon MSK . До сих пор я использовал msk версии 2.2.1, мой код работал нормально, но теперь версия msk была изменена на 2.3.1. Теперь я не могу отправить sh сообщение в Topi c.

Ошибка:

Раздел -1

Смещение -1

Запрос был для topi c или раздела, который не существует в этом брокере.

Код:

func getKafkaEventClient() (sarama.Client, error) {

    if !setupDone {
        return nil, errors.New("Invalid setup")
    }

    if kafkaEventClient != nil {
        return kafkaEventClient, nil
    }

    err := initKafkaEventClient()
    if err != nil {
        return nil, err
    }

    return kafkaEventClient, nil
}

func initKafkaEventClient() (err error) {
      config := sarama.NewConfig()
      config.Net.TLS.Enable = false
      config.Producer.Return.Successes = true
      config.Version = sarama.V0_10_0_0

      brokers := strings.Split(kafkaEventHost, ",") //split the host into brokers

      kafkaEventClient, err = sarama.NewClient(brokers, config)
      if err != nil {
         log.Println("initKafkaClient: failed to create new kafka client", err)
         return
      }
}

func PushMessageToKafka(message string) {
    client, err := getKafkaEventClient()
    if err != nil {
        return
    }

    producer, err := sarama.NewSyncProducerFromClient(kafkaEventClient)
    if err != nil {
    fmt.Println("PushMessageToKafka: failed to get producer", err)
    return
    }
    var msg sarama.ProducerMessage
    msg.Topic = "some_topic"
    msg.Value = sarama.StringEncoder("some_message")
    p, o, err := producer.SendMessage(&msg)

    fmt.Println("Partition", p)
    fmt.Println("Offset", o)

    if err != nil {
        fmt.Println("PushMessageToKafka: failed to push message to be displayed", err)
     }
}

Я также изменил версию sarama на maxVersion config.Version = sarama.MaxVersion, но она не работает для Amazon MSK 2.3.1.

Пожалуйста, предоставьте какое-нибудь решение.

1 Ответ

0 голосов
/ 23 апреля 2020

Я нашел решение после отладки так много раз. Это была не проблема версии, фактически, код, который возвращает клиента

func getKafkaEventClient() (sarama.Client, error) {

    if !setupDone {
        return nil, errors.New("Invalid setup")
    }

    if kafkaEventClient != nil {
        return kafkaEventClient, nil
    }

    err := initKafkaEventClient()
    if err != nil {
        return nil, err
    }

    return kafkaEventClient, nil
}

Здесь if kafkaEventClient != nil, а затем возвращает предыдущего клиента, что неверно. Для каждого клиента, если меняется брокер / хост, нам нужно создать нового клиента, и этот клиент сможет найти топи c, в которых мы хотим получить sh наше сообщение. Если мы получаем старый клиент и отправляем сообщение в topi c, который существует в другом брокере / хосте, тогда мы получим ошибку, как я упоминал выше.

Ошибка:

Раздел -1

Смещение -1

Запрос был для topi c или раздела, который не существует в этом брокере.

Надеюсь, это решит проблему кого-то, кто сталкивается с той же проблемой.

...