天天看點

pomelo代碼分析7———— pomelo的rpc實作分析

想哪寫哪,前面的坑如果不大礙事,咱就不填了。嘿嘿~

RPC:remote procedure call;是啥玩意就不說了。

下面直接看pomelo的實作:

對應module是pomelo-rpc,這個直接npm install可以就可以安裝。

比如: npm install -d pomelo-rpc,下載下傳到目前目錄的node_modules目錄裡面。

安裝完畢以後目錄如下:

AUTHORS  index.js  lib  LICENSE  Makefile  node_modules  package.json  README-Chinese.md  README.md  sample  test
           

可以看到,實際上pomelo-rpc還是依賴不少其他module的,具體這些module幹啥用,這個就不說了。自己看吧。

我們直接上:

首先sample目錄如下:

sample/
├── client.js
├── remote
│   └── test
│       └── service.js
└── server.js
           

直接node server.js可以啟動一個server等待client的rpc調用。

我們再在其他終端裡面:node client.js 就可以看到結果。

我們看看lib下面有什麼:

lib/
├── rpc-client
│   ├── client.js
│   ├── mailboxes
│   │   ├── blackhole.js
│   │   ├── tcp-mailbox.js
│   │   └── ws-mailbox.js
│   ├── mailbox.js
│   ├── mailstation.js
│   └── router.js
├── rpc-server                    #rpc 的server端實作
│   ├── acceptor.js               #接收rpc的Factory,隻有幾行代碼,這裡寫死了是直接調用ws-acceptor.js,也就是websocket的實作。
│   ├── acceptors                 #具體接收器的實作目錄
│   │   ├── tcp-acceptor.js          #tcp
│   │   └── ws-acceptor.js           #websocket
│   ├── dispatcher.js             #rpc的分發實作,具體就是根據下面會介紹到的樹形結構來找到對應的rpc處理函數。
│   ├── gateway.js                #這個實際上才是真正的server,負責服務的start,stop,并且初始化acceptor的callback函數 ———— dispatcher
│   └── server.js                 #1、加載子產品,并且把子產品當成參數來初始化gateway。2:建構樹形結構來注冊這些被初始化的子產品。
└── util
    ├── proxy.js
    ├── tracer.js
    └── utils.js
           

OK,從檔案上看server的邏輯時序如下

server.js : #1、加載子產品,并且把子產品當成參數來初始化 gateway。2)建構樹形結構,來注冊這些被初始化的子產品

gateway.js:# 初始化acceptor的callback函數 ———— dispatcher

         acceptor.js:# acceptor的factory,這裡寫死了隻生産ws類型的acceptor

          ws-acceptor.js #websocket方式的接收器具體實作。

dispatcher.js:#簡單根據client調用的樹形結構來逐級查找注冊的module,并且調用該module。

下面扒代碼:

第一部分:注冊,生成callback的樹形結構

sample/server.js

1 var Server = require('..').server;
  2  
  3 debugger;
  4 // remote service path info list
  5 var paths = [
  6   {namespace: 'user', path: __dirname + '/remote/test'}  #這裡就是所有要注冊的module,本sample隻有一個。
  7 ];
  8                                                                                                                                                            
  9 var port = 3333;
 10  
 11 var server = Server.create({paths: paths, port: port});
 12 server.start();
 13 console.log('rpc server started.');
           

看出來pomelo裡面的rpc使用封裝的很漂亮。

其中11行 會調用到 lib/rpc-server/server.js檔案的create函數,如下:

module.exports.create = function(opts) {
  if(!opts || !opts.port || opts.port < 0 || !opts.paths) {
    throw new Error('opts.port or opts.paths invalid.');
  }

  var services = loadRemoteServices(opts.paths, opts.context);
  opts.services = services;
  var gateway = Gateway.create(opts);
  return gateway;
};
           

可以看出,實際上一個server就是一個gateway !!

這個gateway會接受 被loadRemoteServices生成的一系列services作為參數。

我們再看這個函數:

4 var loadRemoteServices = function(paths, context) {
  5   var res = {}, item, m;
  6   for(var i=0, l=paths.length; i<l; i++) {
  7     item = paths[i];
  8     m = Loader.load(item.path, context);
  9  
 10     if(m) {
 11       createNamespace(item.namespace, res);
 12       for(var s in m) {
 13         res[item.namespace][s] = m[s];
 14       }
 15     }
 16   }
 17  
 18   return res;
 19 };
 20  
           
21 var createNamespace = function(namespace, proxies) {
 22   proxies[namespace] = proxies[namespace] || {};
 23 };
 24  
           

其中load()前面一篇文章我們已經說過了,就是pomelo-loader。是以m就是一個module;

如前說述,所有的module生成工作和exports導出工作在load()函數已經完成。

第10 行後面的幾行代碼隻是為了組織res而已,仔細看代碼,或者通過debuger列印,可以看出。

>res
  Object
    user: Object
      service: Object
        echo: function (msg, cb) {  ...
           

pomelo會根據 初始化時候傳入的namespace,js 子產品的檔案名,module中的exports名稱來建構,

res[namespace][filename][exports.name] = m[exports.name];

這裡就是一個樹形的注冊而已。等着被後面client來call。

而具體的call邏輯如下:

第二部分:調用,觸發邏輯

上面第一部分已經注冊了所有的module,當server.start()的時候,就是調用gateway.start(),進而調用accepter.listen();

這裡的acceptor就是ws-accptor.js,對應部分代碼如下:

pro.listen = function(port) {
  //check status
  if(!!this.inited) {
    utils.invokeCallback(this.cb, new Error('already inited.'));
    return;
  }
  this.inited = true;

  var self = this;

  this.server = sio.listen(port);             # 1、第一步,調用socket.io的listen去監聽網絡。

  this.server.set('log level', 0);

  this.server.server.on('error', function(err) {  #錯誤處理,不說了。。
    self.emit('error', err);
  });

  this.server.sockets.on('connection', function(socket) { #2、注冊client的連結事件;
    self.sockets[socket.id] = socket;

    self.emit('connection', {id: socket.id, ip: socket.handshake.address.address});

    socket.on('message', function(pkg) {                 #3、在連結建立好以後,我們注冊 “message”事件到ProcessMsgs()函數。  
      try {
        if(pkg instanceof Array) {
          processMsgs(socket, self, pkg);
        } else {
          processMsg(socket, self, pkg);
        }
      } catch(e) {
        // socke.io would broken if uncaugth the exception
        console.error('rpc server process message error: ' + e);
      }
    });

    socket.on('disconnect', function(reason) {          # 斷開事件,不說了。。。
      delete self.sockets[socket.id]; 
      delete self.msgQueues[socket.id];
    });
  });

  this.on('connection', ipFilter.bind(this));          #白名單,不說了。。。。

  if(this.bufferMsg) {
    this._interval = setInterval(function() {
      flush(self);
    }, this.interval);
  }
};
           

看上面代碼中的注釋,我們可以知道,一旦client有消息過來,我們就調用下面的函數:

var processMsg = function(socket, acceptor, pkg) {
  var tracer = new Tracer(acceptor.rpcLogger, acceptor.rpcDebugLog, pkg.remote, pkg.source, pkg.msg, pkg.traceId, pkg.seqId);
  tracer.info('server', __filename, 'processMsg', 'ws-acceptor receive message and try to process message');
  acceptor.cb.call(null, tracer, pkg.msg, function() {        #####我們隻看這個##########################
    var args = Array.prototype.slice.call(arguments, 0);
    for(var i=0, l=args.length; i<l; i++) {
      if(args[i] instanceof Error) {
        args[i] = cloneError(args[i]);
      }
    }
    。。。。。。。。。。。。。。。。。。。。。。
           

接着上面說: 一旦client有消息來,我們就調用processMsg(),而processMsg()則調用acceptor的回調函數。也就是dispatcher.route()函數;

這個是在gateway初始化的時候挂在acceptor上面的。具體代碼如下:

this.acceptor = this.acceptorFactory.create(opts, function(tracer, msg, cb) {
    dispatcher.route(tracer, msg, self.services, cb);
  });
           

我們再看,這個route()到底幹啥,如下:

module.exports.route = function(tracer, msg, services, cb) {
  tracer.info('server', __filename, 'route', 'route messsage to appropriate service object');
  var namespace = services[msg.namespace];

  var service = namespace[msg.service];


  var method = service[msg.method];

  var args = msg.args.slice(0);
  args.push(cb);
  method.apply(service, args);
};
           

 上面代碼去掉錯誤檢測,看上去直覺點。

就是:

methode = services[msg.namespace][msg.sevice][msg.method] 而已。

而這裡的msg屬性如下:

namespace :“user”

service:“service”

method:“echo”

看見了吧,就是上面第一部分注冊的時候我們建構的樹形結構嘛。

如果client調用的時候寫的不比對,則會在上面代碼中錯誤檢測部分被淘汰。

OK,完畢:

總結:

第一步:server建構一個樹形的對應關系。

然後:根據client傳遞過來的并且被解析後的JSON來查找并且調用樹形結構中對應的函數。

================下面看client,(坑)

繼續閱讀