Смотреть создание документов с помощью MongoDB Streams - PullRequest
0 голосов
/ 04 декабря 2018

Я бы хотел использовать потоки MongoDB для запуска события каждый раз, когда в коллекцию вставляется определенный тип данных.

Я уже нашел что-то примерно похожее на то, что я ищу, ноэто работает только для потоков изменений, а не для вставок.

Есть идеи, как мне это сделать?

Я использую Mongodb драйвер с Nodejs чтобы это сделать, мой код будет выглядеть примерно так:

const MongoClient = require('mongodb').MongoClient;

const uri = 'mongodb://localhost:27017/?replicaSet=rs0';
MongoClient.connect(uri, function(err, client) {

const db = client.db('mydb');

// Connect using MongoClient
var filter = [{
    $match: {
        $or: [
            { $or: [{"receipt.receiver": "newdexpocket"}, {"act.account": "newdexpocket"}] }]
    }
}];

var options = { fullDocument: 'updateLookup' };
db.collection('somecollection').watch(filter, options).on('create', data => 
  {
    console.log(data);
  });
});
  • Нужно ли указывать operationType в фильтре?
  • Мне также нужнополучить полный документ, но очевидно, что updateLookup - это не тот инструмент, что мне использовать?
  • Какие варианты я могу использовать для события on?Я использовал create, но я даже не уверен, что он существует, не так ли?

Извините за все эти вопросы, но я изо всех сил пытаюсь найти ответы в официальном документе.

РЕШЕНИЕ:

Остерегайтесь не забывать fullDocument в вашем запросе; -)

function watch_insert(con, db, coll) {
  console.log(new Date() + ' watching: ' + coll);

  const insert_pipeline = [ { $match:
                    { 
                        operationType: 'insert',
                        $or: [
                            { "fullDocument.receipt.receiver": "newdexpocket" },
                            { "fullDocument.act.account": "newdexpocket" }
                        ]
                    }
                }];

  con.db(db).collection(coll).watch(insert_pipeline)
    .on('change', data => {
      console.log(data)
    });
}

async function run(uri) {
    try {
        con = await MongoClient.connect(uri, {"useNewUrlParser": true});
        watch_insert(con, 'EOS', 'action_traces');
    } catch (err) {
        console.log(err);
    }
}

1 Ответ

0 голосов
/ 05 декабря 2018

Вам необходимо:

  1. Указать operationType: 'insert'.Поскольку вы не хотите отслеживать обновления, вам не нужно updateLookup.
  2. Создать правильный конвейер агрегации для вашего фильтра, включающий operationType.
  3. Конвейер агрегации фильтрует документы, возвращаемые watch().Пример вывода на странице Изменить события .

watch() возвращает ChangeStream.Он запускает события close, change, end и error.См. ChangeStream для получения более подробной информации.

Вот полный пример changetream, который прослушивает insert операцию с базой данных test collection test.Он будет выводить документы с полем {a: 1} ('fullDocument.a': 1) и игнорировать обновления, вставки других значений a или что-либо без поля a.

const MongoClient = require('mongodb').MongoClient
const uri = 'mongodb://localhost:27017/test?replicaSet=replset'

const insert_pipeline = [
  {$match: {operationType: 'insert', 'fullDocument.a': 1}}
]

function watch_insert(con, db, coll) {
  console.log(new Date() + ' watching: ' + coll)
  con.db(db).collection(coll).watch(insert_pipeline)
    .on('change', data => {
      console.log(data)
    })
}

async function run() {
  con = await MongoClient.connect(uri, {"useNewUrlParser": true})
  watch_insert(con, 'test', 'test')
}

run()
.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...