Mongodb源码查询流程

2018/03/09 Mongodb

MongoDB源码查询流程

mongo的查询请求

首先进入mongo的查询请求部分.mongo的查询请求部分归纳起来很简单就是将请求分装成一个Message结构,然后将其发送到服务端,等待服务端的相应数据,取得数据最后显示结果.下面来看具体流程分析吧。

当我们点击db.coll.find({x:1})时按照上一篇文章的讲解,我们首先来到了mongo/shell/dbshell.cpp

            if ( ! wascmd ) {
                try {
                    if ( scope->exec( code.c_str() , "(shell)" , false , true , false ) )//执行相应的javascript代码
                        scope->exec( "shellPrintHelper( __lastres__ );" , "(shell2)" , true , true , false );
                }
                catch ( std::exception& e ) {
                    cout << "error:" << e.what() << endl;
                }
            }

下面进入javascript代码,其在mongo/shell/collection.js.


	//这里因为我们只设置了query,所以其它选项都是空的,this.getQueryOptions()目前只有一个SlaveOK的option,在replset模式下是不能查询secondary服务器的,需要调用rs.SlaveOK()之后才能对secondary进行查询,其执行SlaveOK后每次查询时都会添加一个QueryOption.
DBCollection.prototype.find = function (query, fields, limit, skip, batchSize, options) {
    return new DBQuery( this._mongo , this._db , this ,
                        this._fullName , this._massageObject( query ) , fields , limit , skip , batchSize , options || this.getQueryOptions() );
}

继续前进看看DBQuery,看这里的new DBQuery对象的创建发生在:


    JSBool dbquery_constructor( JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval ) {
        try {
            smuassert( cx ,  "DDQuery needs at least 4 args" , argc >= 4 );
	    //整个代码都是创建一个DBQuery对象,并未进行任何的查询请求动作
            Convertor c(cx);
            c.setProperty( obj , "_mongo" , argv[0] );
            c.setProperty( obj , "_db" , argv[1] );
            c.setProperty( obj , "_collection" , argv[2] );
            c.setProperty( obj , "_ns" , argv[3] );
 
            if ( argc > 4 && JSVAL_IS_OBJECT( argv[4] ) )
                c.setProperty( obj , "_query" , argv[4] );
            else {
                JSObject * temp = JS_NewObject( cx , 0 , 0 , 0 );
                CHECKNEWOBJECT( temp, cx, "dbquery_constructor" );
                c.setProperty( obj , "_query" , OBJECT_TO_JSVAL( temp ) );
            }
 
            if ( argc > 5 && JSVAL_IS_OBJECT( argv[5] ) )
                c.setProperty( obj , "_fields" , argv[5] );
            else
                c.setProperty( obj , "_fields" , JSVAL_NULL );
 
 
            if ( argc > 6 && JSVAL_IS_NUMBER( argv[6] ) )
                c.setProperty( obj , "_limit" , argv[6] );
            else
                c.setProperty( obj , "_limit" , JSVAL_ZERO );
 
            if ( argc > 7 && JSVAL_IS_NUMBER( argv[7] ) )
                c.setProperty( obj , "_skip" , argv[7] );
            else
                c.setProperty( obj , "_skip" , JSVAL_ZERO );
 
            if ( argc > 8 && JSVAL_IS_NUMBER( argv[8] ) )
                c.setProperty( obj , "_batchSize" , argv[8] );
            else
                c.setProperty( obj , "_batchSize" , JSVAL_ZERO );
 
            if ( argc > 9 && JSVAL_IS_NUMBER( argv[9] ) )
                c.setProperty( obj , "_options" , argv[9] );
            else
                c.setProperty( obj , "_options" , JSVAL_ZERO );
 
            c.setProperty( obj , "_cursor" , JSVAL_NULL );
            c.setProperty( obj , "_numReturned" , JSVAL_ZERO );
            c.setProperty( obj , "_special" , JSVAL_FALSE );
        }
        catch ( const AssertionException& e ) {
            if ( ! JS_IsExceptionPending( cx ) ) {
                JS_ReportError( cx, e.what() );
            }
            return JS_FALSE;
        }
        catch ( const std::exception& e ) {
            log() << "unhandled exception: " << e.what() << ", throwing Fatal Assertion" << endl;
            fassertFailed( 16323 );
        }
        return JS_TRUE;
    }

可以看到上面只有DBQuery对象的构建动作,并没有真正的查询请求,那么查询请求去哪里了呢?回到

                try {
                    if ( scope->exec( code.c_str() , "(shell)" , false , true , false ) )//执行相应的javascript代码
                        scope->exec( "shellPrintHelper( __lastres__ );" , "(shell2)" , true , true , false );
                }

继续分析shellPrintHelper函数,这里__lastres__是什么,搜索整个source insight工程发现mongo/scripting/engine_spidermonkey.cpp中:


        bool exec( const StringData& code,const string& name = "(anon)",bool printResult = false,bool reportError = true, bool assertOnError = true,int timeoutMs = 0 ) {
            JSBool worked = JS_EvaluateScript( _context,
                                               _global,
                                               code.data(),
                                               code.size(),
                                               name.c_str(),
                                               1,
                                               &ret );
            if ( worked )
                _convertor->setProperty( _global , "__lastres__" , ret );
        }

原来__lastres__就是上一条执行语句的结果也就是这里的DBQuery对象.继续分析shellPrintHelper函数(mongo/util/util.js)


shellPrintHelper = function (x) {
    if (typeof (x) == "undefined") {
        // Make sure that we have a db var before we use it
        // TODO: This implicit calling of GLE can cause subtle, hard to track issues - remove?
        if (__callLastError && typeof( db ) != "undefined" && db.getMongo ) {
            __callLastError = false;
            // explicit w:1 so that replset getLastErrorDefaults aren't used here which would be bad.
            var err = db.getLastError(1);
            if (err != null) {
                print(err);
            }
        }
        return;
    }
 
    if (x == __magicNoPrint)
        return;
 
    if (x == null) {
        print("null");
        return;
    }
 
    if (typeof x != "object")
        return print(x);
 
    var p = x.shellPrint;//我们这里是DBQuery对象,所以执行到这里,来到了DBQuery.shellPrint函数
    if (typeof p == "function")
        return x.shellPrint();
 
    var p = x.tojson;
    if (typeof p == "function")
        print(x.tojson());
    else
        print(tojson(x));
}



DBQuery.prototype.shellPrint = function(){//(mongo/util/query.js)
    try {
        var start = new Date().getTime();
        var n = 0;//还有查询结果并且输出数目小于shellBatchSize,循环打印结果
        while ( this.hasNext() && n < DBQuery.shellBatchSize ){//这里shellBatchSize定义为20
            var s = this._prettyShell ? tojson( this.next() ) : tojson( this.next() , "" , true );
            print( s );//调用native函数native_print打印结果
            n++;
        }
        if (typeof _verboseShell !== 'undefined' && _verboseShell) {
            var time = new Date().getTime() - start;
            print("Fetched " + n + " record(s) in " + time + "ms");
        }
         if ( this.hasNext() ){
            print( "Type \"it\" for more" );
            ___it___  = this;
        }
        else {
            ___it___  = null;
        }
   }
    catch ( e ){
        print( e );
    }
    
}

继续看看hasNext函数和next函数:

DBQuery.prototype.hasNext = function(){
    this._exec();
 
    if ( this._limit > 0 && this._cursorSeen >= this._limit )//超过了限制返回false,将不会再输出结果
        return false;
    var o = this._cursor.hasNext();
    return o;
}
 
DBQuery.prototype.next = function(){
    this._exec();
    
    var o = this._cursor.hasNext();
    if ( o )
        this._cursorSeen++;
    else
        throw "error hasNext: " + o;
    
    var ret = this._cursor.next();
    if ( ret.$err && this._numReturned == 0 && ! this.hasNext() )
        throw "error: " + tojson( ret );
 
    this._numReturned++;
    return ret;
}

继续前进到_exec函数:

DBQuery.prototype._exec = function(){//到这里终于到了this._mongo.find
    if ( ! this._cursor ){
        assert.eq( 0 , this._numReturned );
        this._cursor = this._mongo.find( this._ns , this._query , this._fields , this._limit , this._skip , this._batchSize , this._options );
        this._cursorSeen = 0;
    }
    return this._cursor;
}

到这里我们来到了this._mongo.find,这里_mongo是一个Mongo对象,在上一篇文章中我们了解到find函数是本地函数mongo_find.继续分析 mongo_find(mongo/scripting/sm_db.cpp).这里删除了部分错误处理代码.


    JSBool mongo_find(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
            shared_ptr< DBClientWithCommands > * connHolder = (shared_ptr< DBClientWithCommands >*)JS_GetPrivate( cx , obj );
            smuassert( cx ,  "no connection!" , connHolder && connHolder->get() );
            DBClientWithCommands *conn = connHolder->get();
            Convertor c( cx );
            string ns = c.toString( argv[0] );
            BSONObj q = c.toObject( argv[1] );
            BSONObj f = c.toObject( argv[2] );
 
            int nToReturn = (int) c.toNumber( argv[3] );
            int nToSkip = (int) c.toNumber( argv[4] );
            int batchSize = (int) c.toNumber( argv[5] );
            int options = (int)c.toNumber( argv[6] );//上面一篇文章我们分析到这里的conn其实是由ConnectionString::connect函数返回的,其返回的对象指针可能是:DBClientConnection对应Master,也就是只设置了一个地址,DBClientReplicaSet对应pair或者set模式,SyncClusterConnection对应sync模式,继续分析流程我们选择最简单的Master模式,只有一个地址的服务端
            auto_ptr<DBClientCursor> cursor = conn->query( ns , q , nToReturn , nToSkip , f.nFields() ? &f : 0  , options , batchSize );
            if ( ! cursor.get() ) {
                log() << "query failed : " << ns << " " << q << " to: " << conn->toString() << endl;
                JS_ReportError( cx , "error doing query: failed" );
                return JS_FALSE;
            }
            JSObject * mycursor = JS_NewObject( cx , &internal_cursor_class , 0 , 0 );
            CHECKNEWOBJECT( mycursor, cx, "internal_cursor_class" );
            verify( JS_SetPrivate( cx , mycursor , new CursorHolder( cursor, *connHolder ) ) );
            *rval = OBJECT_TO_JSVAL( mycursor );
        return JS_TRUE;
    }

那么我们继续前进来到DBClientConnection::query函数,该函数只是简单调用了DBClientBase::query函数.


    auto_ptr<DBClientCursor> DBClientBase::query(const string &ns, Query query, int nToReturn,
            int nToSkip, const BSONObj *fieldsToReturn, int queryOptions , int batchSize ) {
        auto_ptr<DBClientCursor> c( new DBClientCursor( this,//根据传入的参数创建一个DBClientCursor对象
                                    ns, query.obj, nToReturn, nToSkip,
                                    fieldsToReturn, queryOptions , batchSize ) );
        if ( c->init() )//创建Message并向服务端发送查询请求
            return c;
        return auto_ptr< DBClientCursor >( 0 );
    }
    bool DBClientCursor::init() {
        Message toSend;
        _assembleInit( toSend );//构建将要发送的查询请求这是一个Message,具体来说Message负责发送数据,具体的数据是在MsgData中
        verify( _client );
        if ( !_client->call( toSend, *batch.m, false, &_originalHost ) ) {//实际的发送数据,同时这里发送了数据后会调用recv接收数据
            // log msg temp?                                              //接收的数据同样是MsgData,同样由Message来管理
            log() << "DBClientCursor::init call() failed" << endl;
            return false;
        }
        if ( batch.m->empty() ) {
            // log msg temp?
            log() << "DBClientCursor::init message from call() was empty" << endl;
            return false;
        }
        dataReceived();//根据上面的batch.m收到的数据得出查询是否成功成功则设置cursorId,下一次请求时operation就变动为dbGetmore了.
        return true;   //查询错误则抛出异常
    }

_assembleInit是创建一个message结构,若是第一次请求那么请求操作为dbQuery,若不是则请求操作为dbGetmore.来看看MsgData的具体结构吧. 回到mongo_find函数.

          auto_ptr<DBClientCursor> cursor = conn->query( ns , q , nToReturn , nToSkip , f.nFields() ? &f : 0  , options , batchSize );
            if ( ! cursor.get() ) {//这里得到了cursor
                log() << "query failed : " << ns << " " << q << " to: " << conn->toString() << endl;
                JS_ReportError( cx , "error doing query: failed" );
                return JS_FALSE;
            }
            JSObject * mycursor = JS_NewObject( cx , &internal_cursor_class , 0 , 0 );//将cursor封装成一个javascript对象,javascript就能
            CHECKNEWOBJECT( mycursor, cx, "internal_cursor_class" );//使用游标了
            verify( JS_SetPrivate( cx , mycursor , new CursorHolder( cursor, *connHolder ) ) );

    JSBool internal_cursor_hasNext(JSContext *cx, JSObject *obj, uintN argc, jsval *argv, jsval *rval) {
        try {
            DBClientCursor *cursor = getCursor( cx, obj );
            *rval = cursor->more() ? JSVAL_TRUE : JSVAL_FALSE;//这里返回的就是是否还有数据,如果本地没有查询数据了,那么其会再构建一个
        }                                                     //dbGetmore的请求向服务器请求更多数据,还是没有则返回false,表示没有数据了
        catch ( const AssertionException& e ) {
            if ( ! JS_IsExceptionPending( cx ) ) {
                JS_ReportError( cx, e.what() );
            }
            return JS_FALSE;
        }
        catch ( const std::exception& e ) {
            log() << "unhandled exception: " << e.what() << ", throwing Fatal Assertion" << endl;
            fassertFailed( 16290 );
        }
        return JS_TRUE;
    }

mongod的数据库加载

mongod服务对于客户端请求的处理在mongo/db/db.cpp MyMessageHandler::process中,其中调用了函数assembleResponse完成请求响应,我们就从这个函数开始入手分析

    void assembleResponse( Message &m, DbResponse &dbresponse, const HostAndPort& remote ) {
        if ( op == dbQuery ) {
            if( strstr(ns, ".$cmd") ) {
                isCommand = true;
                opwrite(m);//写入诊断用的log,默认loglevel为0,未开启,需要开启启动时加入--diaglog x,0 = off; 1 = writes, 2 = reads, 3 = both
                if( strstr(ns, ".$cmd.sys.") ) {//7 = log a few reads, and all writes.
                    if( strstr(ns, "$cmd.sys.inprog") ) {
                        inProgCmd(m, dbresponse);//查看当前进度的命令
                        return;
                    }
                    if( strstr(ns, "$cmd.sys.killop") ) {
                        killOp(m, dbresponse);//终止当前操作
                        return;
                    }
                    if( strstr(ns, "$cmd.sys.unlock") ) {
                        unlockFsync(ns, m, dbresponse);
                        return;
                    }
                }
            }
            else {
                opread(m);
            }
        }
        else if( op == dbGetMore ) {
            opread(m);
        }
        else {
            opwrite(m);
        }
        long long logThreshold = cmdLine.slowMS;//启动的时候设置的参数默认是100ms,当操作超过了这个时间且启动时设置--profile为1或者2
        bool shouldLog = logLevel >= 1;//时mongodb将记录这次慢操作,1为只记录慢操作,即操作时间大于了设置的slowMS,2表示记录所有操作
        if ( op == dbQuery ) {         //可通过--slowms设置slowMS
            if ( handlePossibleShardedMessage( m , &dbresponse ) )//这里和shard有关,以后会的文章会讲到
                return;
            receivedQuery(c , dbresponse, m );//真正的查询入口
        }
        else if ( op == dbGetMore ) {//已经查询了数据,这里只是执行得到更多数据的入口
            if ( ! receivedGetMore(dbresponse, m, currentOp) )
                shouldLog = true;
        }
                if ( op == dbKillCursors ) {
                    currentOp.ensureStarted();
                    logThreshold = 10;
                    receivedKillCursors(m);
                }
                else if ( op == dbInsert ) {//插入操作入口
                    receivedInsert(m, currentOp);
                }
                else if ( op == dbUpdate ) {//更新操作入口
                    receivedUpdate(m, currentOp);
                }
                else if ( op == dbDelete ) {//删除操作入口
                    receivedDelete(m, currentOp);
                }
        if ( currentOp.shouldDBProfile( debug.executionTime ) ) {//该操作将被记录,原因可能有二:一,启动时设置--profile 2,则所有操作将被
            // performance profiling is on                    //记录.二,启动时设置--profile 1,且操作时间超过了默认的slowMs,那么操作将被            else {//这个地方if部分被删除了,就是在不能获取锁的状况下不记录该操作的代码
                Lock::DBWrite lk( currentOp.getNS() );//记录具体记录操作,就是在xxx.system.profile集合中插入该操作的具体记录
                if ( dbHolder()._isLoaded( nsToDatabase( currentOp.getNS() ) , dbpath ) ) {
                    Client::Context cx( currentOp.getNS(), dbpath, false );
                    profile(c , currentOp );
                }
            }
        }

前进到receivedQuery,其解析了接收到的数据,然后调用runQuery负责处理查询,然后出来runQuery抛出的异常,直接进入runQuery.


    string runQuery(Message& m, QueryMessage& q, CurOp& curop, Message &result) {        
	shared_ptr<ParsedQuery> pq_shared( new ParsedQuery(q) );
        if ( pq.couldBeCommand() ) {//这里表明这是一个命令,关于mongodb的命令的讲解这里有一篇文章,我就不再分析了.
            BSONObjBuilder cmdResBuf;//http://www.cnblogs.com/daizhj/archive/2011/04/29/mongos_command_source_code.html
            if ( runCommands(ns, jsobj, curop, bb, cmdResBuf, false, queryOptions) ){}
			
        bool explain = pq.isExplain();//这里的explain来自这里db.coll.find().explain(),若使用了.explain()则为true,否则false
        BSONObj order = pq.getOrder();
        BSONObj query = pq.getFilter();
        // Run a simple id query.
        if ( ! (explain || pq.showDiskLoc()) && isSimpleIdQuery( query ) && !pq.hasOption( QueryOption_CursorTailable ) ) {
            if ( queryIdHack( ns, query, pq, curop, result ) ) {//id查询的优化
                return "";
            }
        }
        bool hasRetried = false;
        while ( 1 ) {//这里的ReadContext这这篇文章的主角,其内部在第一次锁数据库时完成了数据库的加载动作
                Client::ReadContext ctx( ns , dbpath ); // read locks
                replVerifyReadsOk(&pq);//还记得replset模式中无法查询secondary服务器吗,就是在这里限制的
                BSONObj oldPlan;
                if ( ! hasRetried && explain && ! pq.hasIndexSpecifier() ) {
                    scoped_ptr<MultiPlanScanner> mps( MultiPlanScanner::make( ns, query, order ) );
                    oldPlan = mps->cachedPlanExplainSummary();
                }//这里才是真正的查询,其内部很复杂,下一篇文章将讲到
                return queryWithQueryOptimizer( queryOptions, ns, jsobj, curop, query, order,
                                                pq_shared, oldPlan, shardingVersionAtStart, 
                                                pgfs, npfe, result );
            }
        }
    }

Client::ReadContext::ReadContext(const string& ns, string path, bool doauth ) {
        {
            lk.reset( new Lock::DBRead(ns) );//数据库锁,这里mongodb的锁机制本文将不会涉及到,感兴趣的自己分析
            Database *db = dbHolder().get(ns, path);
            if( db ) {//第一次加载时显然为空
                c.reset( new Context(path, ns, db, doauth) );
                return;
            }
        }
        if( Lock::isW() ) { //全局的写锁
			// write locked already
                DEV RARELY log() << "write locked on ReadContext construction " << ns << endl;
                c.reset( new Context(ns, path, doauth) );
            }
        else if( !Lock::nested() ) { 
            lk.reset(0);
            {
                Lock::GlobalWrite w;//加入全局的写锁,这里是真正的数据库加载地点
                Context c(ns, path, doauth);
            }
            // db could be closed at this interim point -- that is ok, we will throw, and don't mind throwing.
            lk.reset( new Lock::DBRead(ns) );
            c.reset( new Context(ns, path, doauth) );
        }
    }
    Database* DatabaseHolder::getOrCreate( const string& ns , const string& path , bool& justCreated ) {
        string dbname = _todb( ns );//将test.coll这种类型的字符串转换为test
        {
            SimpleMutex::scoped_lock lk(_m);
            Lock::assertAtLeastReadLocked(ns);
            DBs& m = _paths[path];//在配置的路径中找到已经加载的数据库,直接返回
            {
                DBs::iterator i = m.find(dbname); 
                if( i != m.end() ) {
                    justCreated = false;
                    return i->second;
                }
            }
        Database *db = new Database( dbname.c_str() , justCreated , path );//实际的数据读取
        {
            SimpleMutex::scoped_lock lk(_m);//数据库加载完成后按照路径数据库记录
            DBs& m = _paths[path];
            verify( m[dbname] == 0 );
            m[dbname] = db;
            _size++;
        }
        return db;
    }

Search

    微信好友

    博士的沙漏

    Table of Contents