想哪寫哪,前面的坑如果不大礙事,咱就不填了。嘿嘿~
RPC:remote procedure call;是啥玩意就不說了。
下面直接看pomelo的實作:
對應module是pomelo-rpc,這個直接npm install可以就可以安裝。
比如: npm install -d pomelo-rpc,下載下傳到目前目錄的node_modules目錄裡面。
安裝完畢以後目錄如下:
AUTHORS index.js lib LICENSE Makefile node_modules package.json README-Chinese.md README.md sample test
可以看到,實際上pomelo-rpc還是依賴不少其他module的,具體這些module幹啥用,這個就不說了。自己看吧。
我們直接上:
首先sample目錄如下:
sample/
├── client.js
├── remote
│ └── test
│ └── service.js
└── server.js
直接node server.js可以啟動一個server等待client的rpc調用。
我們再在其他終端裡面:node client.js 就可以看到結果。
我們看看lib下面有什麼:
lib/
├── rpc-client
│ ├── client.js
│ ├── mailboxes
│ │ ├── blackhole.js
│ │ ├── tcp-mailbox.js
│ │ └── ws-mailbox.js
│ ├── mailbox.js
│ ├── mailstation.js
│ └── router.js
├── rpc-server #rpc 的server端實作
│ ├── acceptor.js #接收rpc的Factory,隻有幾行代碼,這裡寫死了是直接調用ws-acceptor.js,也就是websocket的實作。
│ ├── acceptors #具體接收器的實作目錄
│ │ ├── tcp-acceptor.js #tcp
│ │ └── ws-acceptor.js #websocket
│ ├── dispatcher.js #rpc的分發實作,具體就是根據下面會介紹到的樹形結構來找到對應的rpc處理函數。
│ ├── gateway.js #這個實際上才是真正的server,負責服務的start,stop,并且初始化acceptor的callback函數 ———— dispatcher
│ └── server.js #1、加載子產品,并且把子產品當成參數來初始化gateway。2:建構樹形結構來注冊這些被初始化的子產品。
└── util
├── proxy.js
├── tracer.js
└── utils.js
OK,從檔案上看server的邏輯時序如下
server.js : #1、加載子產品,并且把子產品當成參數來初始化 gateway。2)建構樹形結構,來注冊這些被初始化的子產品
gateway.js:# 初始化acceptor的callback函數 ———— dispatcher
acceptor.js:# acceptor的factory,這裡寫死了隻生産ws類型的acceptor
ws-acceptor.js #websocket方式的接收器具體實作。
dispatcher.js:#簡單根據client調用的樹形結構來逐級查找注冊的module,并且調用該module。
下面扒代碼:
第一部分:注冊,生成callback的樹形結構
sample/server.js
1 var Server = require('..').server;
2
3 debugger;
4 // remote service path info list
5 var paths = [
6 {namespace: 'user', path: __dirname + '/remote/test'} #這裡就是所有要注冊的module,本sample隻有一個。
7 ];
8
9 var port = 3333;
10
11 var server = Server.create({paths: paths, port: port});
12 server.start();
13 console.log('rpc server started.');
看出來pomelo裡面的rpc使用封裝的很漂亮。
其中11行 會調用到 lib/rpc-server/server.js檔案的create函數,如下:
module.exports.create = function(opts) {
if(!opts || !opts.port || opts.port < 0 || !opts.paths) {
throw new Error('opts.port or opts.paths invalid.');
}
var services = loadRemoteServices(opts.paths, opts.context);
opts.services = services;
var gateway = Gateway.create(opts);
return gateway;
};
可以看出,實際上一個server就是一個gateway !!
這個gateway會接受 被loadRemoteServices生成的一系列services作為參數。
我們再看這個函數:
4 var loadRemoteServices = function(paths, context) {
5 var res = {}, item, m;
6 for(var i=0, l=paths.length; i<l; i++) {
7 item = paths[i];
8 m = Loader.load(item.path, context);
9
10 if(m) {
11 createNamespace(item.namespace, res);
12 for(var s in m) {
13 res[item.namespace][s] = m[s];
14 }
15 }
16 }
17
18 return res;
19 };
20
21 var createNamespace = function(namespace, proxies) {
22 proxies[namespace] = proxies[namespace] || {};
23 };
24
其中load()前面一篇文章我們已經說過了,就是pomelo-loader。是以m就是一個module;
如前說述,所有的module生成工作和exports導出工作在load()函數已經完成。
第10 行後面的幾行代碼隻是為了組織res而已,仔細看代碼,或者通過debuger列印,可以看出。
>res
Object
user: Object
service: Object
echo: function (msg, cb) { ...
pomelo會根據 初始化時候傳入的namespace,js 子產品的檔案名,module中的exports名稱來建構,
res[namespace][filename][exports.name] = m[exports.name];
這裡就是一個樹形的注冊而已。等着被後面client來call。
而具體的call邏輯如下:
第二部分:調用,觸發邏輯
上面第一部分已經注冊了所有的module,當server.start()的時候,就是調用gateway.start(),進而調用accepter.listen();
這裡的acceptor就是ws-accptor.js,對應部分代碼如下:
pro.listen = function(port) {
//check status
if(!!this.inited) {
utils.invokeCallback(this.cb, new Error('already inited.'));
return;
}
this.inited = true;
var self = this;
this.server = sio.listen(port); # 1、第一步,調用socket.io的listen去監聽網絡。
this.server.set('log level', 0);
this.server.server.on('error', function(err) { #錯誤處理,不說了。。
self.emit('error', err);
});
this.server.sockets.on('connection', function(socket) { #2、注冊client的連結事件;
self.sockets[socket.id] = socket;
self.emit('connection', {id: socket.id, ip: socket.handshake.address.address});
socket.on('message', function(pkg) { #3、在連結建立好以後,我們注冊 “message”事件到ProcessMsgs()函數。
try {
if(pkg instanceof Array) {
processMsgs(socket, self, pkg);
} else {
processMsg(socket, self, pkg);
}
} catch(e) {
// socke.io would broken if uncaugth the exception
console.error('rpc server process message error: ' + e);
}
});
socket.on('disconnect', function(reason) { # 斷開事件,不說了。。。
delete self.sockets[socket.id];
delete self.msgQueues[socket.id];
});
});
this.on('connection', ipFilter.bind(this)); #白名單,不說了。。。。
if(this.bufferMsg) {
this._interval = setInterval(function() {
flush(self);
}, this.interval);
}
};
看上面代碼中的注釋,我們可以知道,一旦client有消息過來,我們就調用下面的函數:
var processMsg = function(socket, acceptor, pkg) {
var tracer = new Tracer(acceptor.rpcLogger, acceptor.rpcDebugLog, pkg.remote, pkg.source, pkg.msg, pkg.traceId, pkg.seqId);
tracer.info('server', __filename, 'processMsg', 'ws-acceptor receive message and try to process message');
acceptor.cb.call(null, tracer, pkg.msg, function() { #####我們隻看這個##########################
var args = Array.prototype.slice.call(arguments, 0);
for(var i=0, l=args.length; i<l; i++) {
if(args[i] instanceof Error) {
args[i] = cloneError(args[i]);
}
}
。。。。。。。。。。。。。。。。。。。。。。
接着上面說: 一旦client有消息來,我們就調用processMsg(),而processMsg()則調用acceptor的回調函數。也就是dispatcher.route()函數;
這個是在gateway初始化的時候挂在acceptor上面的。具體代碼如下:
this.acceptor = this.acceptorFactory.create(opts, function(tracer, msg, cb) {
dispatcher.route(tracer, msg, self.services, cb);
});
我們再看,這個route()到底幹啥,如下:
module.exports.route = function(tracer, msg, services, cb) {
tracer.info('server', __filename, 'route', 'route messsage to appropriate service object');
var namespace = services[msg.namespace];
var service = namespace[msg.service];
var method = service[msg.method];
var args = msg.args.slice(0);
args.push(cb);
method.apply(service, args);
};
上面代碼去掉錯誤檢測,看上去直覺點。
就是:
methode = services[msg.namespace][msg.sevice][msg.method] 而已。
而這裡的msg屬性如下:
namespace :“user”
service:“service”
method:“echo”
看見了吧,就是上面第一部分注冊的時候我們建構的樹形結構嘛。
如果client調用的時候寫的不比對,則會在上面代碼中錯誤檢測部分被淘汰。
OK,完畢:
總結:
第一步:server建構一個樹形的對應關系。
然後:根據client傳遞過來的并且被解析後的JSON來查找并且調用樹形結構中對應的函數。
================下面看client,(坑)