Фрагмент кода этой логики c поможет.
Вы можете сделать это с помощью Пн go Изменить потоки .
Например, для просмотра изменений сбора, используйте метод Collection.Watch()
-
var collection *mongo.Collection
// specify a pipeline that will only match "insert" events
// specify the MaxAwaitTimeOption to have each attempt wait two seconds for new documents
matchStage := bson.D{{"$match", bson.D{{"operationType", "insert"}}}}
opts := options.ChangeStream().SetMaxAwaitTime(2 * time.Second)
changeStream, err := collection.Watch(context.TODO(), mongo.Pipeline{matchStage}, opts)
if err != nil {
log.Fatal(err)
}
// print out all change stream events in the order they're received
// see the mongo.ChangeStream documentation for more examples of using change streams
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
// NewConsumer
}
, а затем создайте нового потребителя или вызывайте .SubscribeTopics()
всякий раз, когда вы обновите свою коллекцию, и она будет соответствовать вашим критериям.