Динамическое добавление тем Kafka для использования без перезапуска моего приложения GoLang - PullRequest
0 голосов
/ 22 апреля 2020

У меня есть приложение Golang, которое в качестве отправной точки использует потребителей Kafka. Я получаю список тем для прослушивания из MongoDB во время выполнения. Однако каждый раз, когда мне нужно добавить новую topi c для прослушивания, после добавления в Mon go я должен перезапустить все приложение Golang. Потребитель находится в основном файле. Я использую Confluent в качестве клиента. Есть ли способ добавить больше тем для использования без перезапуска приложения?

Ответы [ 2 ]

1 голос
/ 22 апреля 2020

Вы пробовали потреблять топи c с regular expressions.

Пример:

consume, err := kafka.NewConsumer(&kafka.ConfigMap{
                     "bootstrap.servers":  "server",
})

err = consume.SubscribeTopics([]string{"^.*_mypattern"}, nil)

Источник: https://github.com/confluentinc/confluent-kafka-go/issues/96

Также попробуйте установить эту опцию при инициализации потребителя metadata.max.age.ms. Это обновит метаданные sh, чтобы увидеть, доступны ли новые темы.

0 голосов
/ 22 апреля 2020

Фрагмент кода этой логики c поможет.

Вы можете сделать это с помощью Пн go Изменить потоки .
Например, для просмотра изменений сбора, используйте метод Collection.Watch() -

var collection *mongo.Collection

// specify a pipeline that will only match "insert" events
// specify the MaxAwaitTimeOption to have each attempt wait two seconds for new documents
matchStage := bson.D{{"$match", bson.D{{"operationType", "insert"}}}}
opts := options.ChangeStream().SetMaxAwaitTime(2 * time.Second)
changeStream, err := collection.Watch(context.TODO(), mongo.Pipeline{matchStage}, opts)
if err != nil {
    log.Fatal(err)
}

// print out all change stream events in the order they're received
// see the mongo.ChangeStream documentation for more examples of using change streams
for changeStream.Next(context.TODO()) {
    fmt.Println(changeStream.Current)
    // NewConsumer
}

, а затем создайте нового потребителя или вызывайте .SubscribeTopics() всякий раз, когда вы обновите свою коллекцию, и она будет соответствовать вашим критериям.

...