最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

MongoDB的变换流超时如果数据库已关闭了一段时间

运维笔记admin16浏览0评论

MongoDB的变换流超时如果数据库已关闭了一段时间

MongoDB的变换流超时如果数据库已关闭了一段时间

我在使用的NodeJS MongoDB的变换流,一切工作正常,但如果数据库是下跌已超过10 5秒起床变换流抛出超时错误,这里是我的变换流观察家代码

Service.prototype.watcher = function( db ){

let collection = db.collection('tokens');
let changeStream = collection.watch({ fullDocument: 'updateLookup' });
let resumeToken, newChangeStream;

changeStream.on('change', next => {
    resumeToken = next._id;
    console.log('data is ', JSON.stringify(next))
    changeStream.close();
    // console.log('resumeToken is ', JSON.stringify(resumeToken))
    newChangeStream = collection.watch({ resumeAfter : resumeToken });
    newChangeStream.on('change', next => {
        console.log('insert called ', JSON.stringify( next ))
    });
});

然而,在数据库端我处理它,即如果数据库已关闭或重新连接通过使用此代码

 this.db.on('reconnected', function () {
    console.info('MongoDB reconnected!');
});
this.db.on('disconnected', function() {
    console.warn('MongoDB disconnected!');
});

但我不能够处理变换流守望者停止它时,数据库关闭,当数据库被重新连接再次启动它,或是否有任何其他更好的方式来做到这一点?

回答如下:

什么,你想要做的是封装在一个函数调用watch()。然后,这个函数会调用本身的错误,使用先前保存的简历令牌rewatch集合。什么是从你的代码缺少的就是错误处理程序。例如:

const MongoClient = require('mongodb').MongoClient
const uri = 'mongodb://localhost:27017/test?replicaSet=replset'
var resume_token = null

run()

function watch_collection(con, db, coll) {
  console.log(new Date() + ' watching: ' + coll)
  con.db(db).collection(coll).watch({resumeAfter: resume_token})
    .on('change', data => {
      console.log(data)
      resume_token = data._id
    })
    .on('error', err => {
      console.log(new Date() + ' error: ' + err)
      watch_collection(con, coll)
    })
}

async function run() {
  con = await MongoClient.connect(uri, {"useNewUrlParser": true})
  watch_collection(con, 'test', 'test')
}

需要注意的是watch_collection()包含watch()方法及其处理一起。在变化,它会打印的变化和存储恢复令牌。上的错误,它会调用自身再次rewatch集合。

发布评论

评论列表(0)

  1. 暂无评论