MongoDB меняет время ожидания потока, если база данных не работает в течение некоторого времени - PullRequest
0 голосов
/ 10 февраля 2019

Я использую поток изменений mongoDB в nodejs, все работает нормально, но, если база данных не работает, потребовалось более 10 5 секунд, чтобы подняться, ошибка истечения времени ожидания выброса потока изменений, вот мой код наблюдателя потока изменений

Service.prototype.watcher = function( db ){

let collection = db.collection('tokens');
let changeStream = collection.watch({ fullDocument: 'updateLookup' });
let resumeToken, newChangeStream;

changeStream.on('change', next => {
    resumeToken = next._id;
    console.log('data is ', JSON.stringify(next))
    changeStream.close();
    // console.log('resumeToken is ', JSON.stringify(resumeToken))
    newChangeStream = collection.watch({ resumeAfter : resumeToken });
    newChangeStream.on('change', next => {
        console.log('insert called ', JSON.stringify( next ))
    });
});

однако на стороне базы данных я обработал ее, то есть, если база данных отключена или переподключена с использованием этого кода

 this.db.on('reconnected', function () {
    console.info('MongoDB reconnected!');
});
this.db.on('disconnected', function() {
    console.warn('MongoDB disconnected!');
});

, но я не могу обработать наблюдатель потока изменений, чтобы остановить его, когда база данных не работает, и запуститьэто снова при переподключении базы данных или если есть какой-то другой лучший способ сделать это?

Ответы [ 2 ]

0 голосов
/ 12 февраля 2019

Это решение, которое я разработал, просто добавьте функцию stream.on (ошибка), чтобы он не зависал при возникновении ошибки, перезапустите поток при переподключении базы данных, а также сохраните токен возобновления в файле для каждого события, этополезно, когда приложение падает или останавливается, и вы запускаете снова и в течение этого времени, если было добавлено x количество записей, поэтому при перезапуске приложения просто получите последний маркер возобновления из файла и запустите оттуда наблюдателя, после чего все вставленные записи будут вставлены и, следовательно, нетзапись будет пропущена, вот код ниже

var rsToken ;
    try {
        rsToken = await this.getResumetoken()
    } catch (error) {
        rsToken = null ;
    }

    if (!rsToken)
        changeStream = collection.watch({ fullDocument: 'updateLookup' });
    else 
        changeStream = collection.watch({ fullDocument: 'updateLookup', resumeAfter : rsToken  });

    changeStream.on('change', next => {

        resumeToken = next._id;
        THIS.saveTokenInfile(resumeToken)

        cs_processor.process( next )


    });  
    changeStream.on('error', err => {
        console.log('changestream error ')
    })
0 голосов
/ 12 февраля 2019

То, что вы хотите сделать, это инкапсулировать вызов watch() в функцию.Затем эта функция вызывает себя при ошибке, чтобы пересмотреть коллекцию, используя ранее сохраненный токен возобновления.В вашем коде отсутствует обработчик ошибок.Например:

const MongoClient = require('mongodb').MongoClient
const uri = 'mongodb://localhost:27017/test?replicaSet=replset'
var resume_token = null

run()

function watch_collection(con, db, coll) {
  console.log(new Date() + ' watching: ' + coll)
  con.db(db).collection(coll).watch({resumeAfter: resume_token})
    .on('change', data => {
      console.log(data)
      resume_token = data._id
    })
    .on('error', err => {
      console.log(new Date() + ' error: ' + err)
      watch_collection(con, coll)
    })
}

async function run() {
  con = await MongoClient.connect(uri, {"useNewUrlParser": true})
  watch_collection(con, 'test', 'test')
}

Обратите внимание, что watch_collection() содержит метод watch() вместе с его обработчиком.При изменении он распечатает изменения и сохранит токен возобновления.В случае ошибки он снова вызывает себя для повторного просмотра коллекции.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...