天天看點

Thrift之TProcess類體系原理及源碼詳細解析

我的新浪微網誌: http://weibo.com/freshairbrucewoo

歡迎大家互相交流,共同提高技術。

  之前對Thrift自動生成代碼的實作細節做了詳細的分析,下面進行處理層的實作做詳細分析了!會利用到自動代碼生成的知識。

  這部分是協定層和使用者提供的服務實作之間的紐帶,定義了調用服務實作的接口架構,真正實作某種服務接口是通過上一章介紹的代碼生成工具生成的代碼。本章将介紹這個架構的基本原理,然後通過生成的一個執行個體來具體介紹怎樣完成一次完整的服務,這個可能涉及到下面章節的一些知識,對于這些知識不詳細分析其功能,隻是介紹它在其中起什麼作用。選擇的執行個體是Facebook内部用這個架構實作的一個分布式日志收集系統scribe。下面是這部分相關類的類關系圖:

Thrift之TProcess類體系原理及源碼詳細解析
  從上圖中可以看出TProcessor是這個部分的頂層基類,其他之類基本上都是通過Thrift代碼生成工具生成的,隻有少數是為了擴充一些功能而直接寫代碼實作,如PeekProcessor類就增加了一些對原始資料處理的功能。scribeProcessor和FacebookServiceProcessor類就是用代碼生成器根據IDL檔案生成的,也是我們後面需要分析的一個執行個體。

第一節 服務接口調用架構分析

  這個基本的架構包括三個類,一個就是抽象類TProcessor,負責調用使用者定義的服務接口,從一個接口讀入資料,寫入一個輸出接口。一個最主要的函數定義如下:

1 virtual bool process(boost::shared_ptr<protocol::TProtocol> in,
2 
3                        boost::shared_ptr<protocol::TProtocol> out, void* connectionContext) = 0;      

這個函數是一個純虛函數,是以繼承這個類的子類都必須實作這個函數,這個函數就是最主要的資料傳輸功能。

第二個類就是負責處理TProcessor類産生的事件的類TProcessorEventHandler,主要定義了一些當某事件發生時的處理函數,例如當讀取參數之前可以做一些處理功能。下面是這個類定義的各個成員函數,每一個函數都處理一種事件發送時的情況:

函數名稱 函數功能
getContext 調用其他回調函數之前調用,期望傳回一些有序的上下文對象以便傳遞給其他回調函數使用
freeContext 期望釋放一個上下文有關的資源
preRead 在讀參數以前調用
postRead 在讀參數和處理函數之間調用
preWrite 在處理和寫響應之間調用
postWrite 在寫響應之後調用
asyncComplete 當一個異步函數成功完成調用時調用
handlerError 如果處理函數抛出沒有定義的異常就會調用此函數

最後一個類就是TProcessorContextFreer類,這個類是一個幫助類,幫助生成的代碼來釋放上下文資源。

第二節 基于架構生成的服務執行個體分析

本節将對scribe伺服器采用的服務實作進行詳細分析。

1 接口定義語言檔案(IDL)

(1)Facebook内部共用服務協定

主要有兩個檔案,一個是在Thrift中定義,是用于Facebook内部的一些接口服務定義,這個不僅僅用于scribe伺服器,可能還用于Facebook内部其他系統,這個檔案内容如下:

Thrift之TProcess類體系原理及源碼詳細解析
1 namespace java com.facebook.fb303
 2 
 3 namespace cpp facebook.fb303
 4 
 5 namespace perl Facebook.FB303
 6 
 7 enum fb_status {
 8 
 9   DEAD = 0,
10 
11   STARTING = 1,
12 
13   ALIVE = 2,
14 
15   STOPPING = 3,
16 
17   STOPPED = 4,
18 
19   WARNING = 5,
20 
21 }
22 
23 service FacebookService {
24 
25   string getName(),
26 
27   string getVersion(),
28 
29   fb_status getStatus(),
30 
31   string getStatusDetails(),
32 
33   map<string, i64> getCounters(),
34 
35   i64 getCounter(1: string key),
36 
37   void setOption(1: string key, 2: string value),
38 
39   string getOption(1: string key),
40 
41   map<string, string> getOptions(),
42 
43   string getCpuProfile(1: i32 profileDurationInSec),
44 
45   i64 aliveSince(),
46 
47   oneway void reinitialize(),
48 
49   oneway void shutdown(),
50 
51 }      
Thrift之TProcess類體系原理及源碼詳細解析

上面這個IDL檔案定義了一個枚舉類型用于表示服務的狀态,還定義了一個名位FacebookService的服務,裡面定義了各種操作,如擷取服務狀态的操作、得到計數的操作等等。

下面我們來看看根據這個IDL檔案生成的C++代碼是什麼樣的一個架構。首先生成了一個基于上面服務定義的抽象類如下:

Thrift之TProcess類體系原理及源碼詳細解析
class FacebookServiceIf {

 public:

  virtual ~FacebookServiceIf() {}

  virtual void getName(std::string& _return) = 0;

  virtual void getVersion(std::string& _return) = 0;

  virtual fb_status getStatus() = 0;

  virtual void getStatusDetails(std::string& _return) = 0;

  virtual void getCounters(std::map<std::string, int64_t> & _return) = 0;

  virtual int64_t getCounter(const std::string& key) = 0;

  virtual void setOption(const std::string& key, const std::string& value) = 0;

  virtual void getOption(std::string& _return, const std::string& key) = 0;

  virtual void getOptions(std::map<std::string, std::string> & _return) = 0;

  virtual void getCpuProfile(std::string& _return, const int32_t profileDurationInSec) = 0;

  virtual int64_t aliveSince() = 0;

  virtual void reinitialize() = 0;

  virtual void shutdown() = 0;

};      
Thrift之TProcess類體系原理及源碼詳細解析

注意觀察,除了這個類多了一個虛析構函數,其他函數就是IDL中定義的。接着定義了類FacebookServiceNull,這個是上面那個抽象類的空實作(就是所有方法都沒有做具體的事情),這樣做的好處就是我們需要重寫一些函數的時候隻需要關注我們需要寫的函數,而不是重寫所有函數。接着又定義了封裝每一個函數參數的相應類,就是一個函數的參數都用一個類來封裝定義,函數的傳回值也是這樣處理。這樣做的目的是統一遠端調用的實作接口,因為傳遞參數都隻需要這個封裝類的對象就可以了。是以你會看到每一個服務裡面定義的函數都有下面一組類的定義:

Thrift之TProcess類體系原理及源碼詳細解析
1 (1)class FacebookService_getName_args {…}
 2 
 3 (2)class FacebookService_getName_pargs {…}
 4 
 5 (3)typedef struct _FacebookService_getName_result__isset {…} _FacebookService_getName_result__isset;
 6 
 7 (4)class FacebookService_getName_result{…}
 8 
 9 (5)typedef struct _FacebookService_getName_presult__isset {…} _FacebookService_getName_presult__isset;
10 
11 (6)class FacebookService_getName_presult{…}      
Thrift之TProcess類體系原理及源碼詳細解析

上面這六個類定義就是為服務中的getName函數服務的,相應的每一個函數都會有這種類似的定義和實作。接下來就會定義三個具體實作IDL定義的功能的類,一個用戶端的類,它繼承定義的服務抽象類,每一個具體的函數實作都是同樣的方式和思路,同樣我結合getName函數的實作來看看這個過程,其他函數都是這樣實作的,代碼如下:

1 send_getName();
2 
3 recv_getName(_return);      

由上面代碼可以看出首先調用函數發送函數名稱及相關資訊到遠端,然後接受函數調用的傳回值,發送函數send_getName()的代碼如下:

Thrift之TProcess類體系原理及源碼詳細解析
1 int32_t cseqid = 0;
 2 
 3 oprot_->writeMessageBegin("getName", ::apache::thrift::protocol::T_CALL, cseqid);//寫一個函數調用消息RPC
 4 
 5 FacebookService_getName_pargs args;
 6 
 7 args.write(oprot_);//寫入參數
 8 
 9 oprot_->writeMessageEnd();
10 
11 oprot_->getTransport()->writeEnd();
12 
13 oprot_->getTransport()->flush();//保證這次寫入過程立即生效      
Thrift之TProcess類體系原理及源碼詳細解析

上面代碼就完成了函數名稱以及參數的傳輸,調用的是TProtocol相關的類的函數實作,具體的實作内容和方式會在TProtocol部分介紹。下面接着看一下接收傳回值的函數recv_getName的代碼:

Thrift之TProcess類體系原理及源碼詳細解析
1   int32_t rseqid = 0;//接收的消息序列号
 2 
 3   std::string fname;//函數名稱
 4 
 5   ::apache::thrift::protocol::TMessageType mtype;//消息的類型(調用(T_CALL)、異常(T_EXCEPTION)等)
 6 
 7   iprot_->readMessageBegin(fname, mtype, rseqid);//從傳回消息讀取函數名稱、消息類型
 8 
 9   if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {//處理異常消息
10 
11     ::apache::thrift::TApplicationException x;
12 
13     x.read(iprot_);
14 
15     iprot_->readMessageEnd();
16 
17     iprot_->getTransport()->readEnd();
18 
19     throw x;
20 
21   }
22 
23   if (mtype != ::apache::thrift::protocol::T_REPLY) {//處理傳回消息
24 
25     iprot_->skip(::apache::thrift::protocol::T_STRUCT);
26 
27     iprot_->readMessageEnd();
28 
29     iprot_->getTransport()->readEnd();
30 
31   }
32 
33   if (fname.compare("getName") != 0) {//看是否是我們需要的函數名,不是就跳過消息讀取
34 
35     iprot_->skip(::apache::thrift::protocol::T_STRUCT);
36 
37     iprot_->readMessageEnd();
38 
39     iprot_->getTransport()->readEnd();
40 
41   }
42 
43   FacebookService_getName_presult result;
44 
45   result.success = &_return;
46 
47   result.read(iprot_);//讀取函數傳回值
48 
49   iprot_->readMessageEnd();
50 
51   iprot_->getTransport()->readEnd();
52 
53   if (result.__isset.success) {//成功就傳回結果(已經在_return裡面),否則抛出異常
54 
55     return;
56 
57   }
58 
59   throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "getName failed: unknown result");      
Thrift之TProcess類體系原理及源碼詳細解析

上面代碼就是處理遠端調用的傳回結果,代碼裡面有注釋。一個服務函數的實作大概流程已經展現在我們面前了,處理的過程也已經清晰。這個隻是用于用戶端的處理流程,必須通過有效的機制來通知伺服器端調用相應的函數(這就是RPC)在伺服器端完成相應功能并将結果傳回。這種機制就是通過我們這部分介紹的TProcessor類實作,這就是上面提到三個類中的第二個類,在這個執行個體中是FacebookServiceProcessor類,它從TProcessor類繼承,重點實作兩個函數process和process_fn,其中process會調用process_fn函數來處理用戶端具體調用的那個服務函數,process函數定義如下:

Thrift之TProcess類體系原理及源碼詳細解析
1 bool FacebookServiceProcessor::process(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot, 
 2 
 3 boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot, void* callContext) {
 4 
 5 ::apache::thrift::protocol::TProtocol* iprot = piprot.get();
 6 
 7 ::apache::thrift::protocol::TProtocol* oprot = poprot.get();
 8 
 9 std::string fname;
10 
11 ::apache::thrift::protocol::TMessageType mtype;
12 
13 int32_t seqid;
14 
15   iprot->readMessageBegin(fname, mtype, seqid);//讀取得到函數名稱、消息類型和函數序列号
16 
17 //處理不是函數調用消息的情況
18 
19 if (mtype != ::apache::thrift::protocol::T_CALL && mtype != ::apache::thrift::protocol::T_ONEWAY) {
20 
21     iprot->skip(::apache::thrift::protocol::T_STRUCT);
22 
23     iprot->readMessageEnd();
24 
25     iprot->getTransport()->readEnd();
26 
27     ::apache::thrift::TApplicationException x(::apache::thrift::TApplicationException::INVALID_MESSAGE_TYPE);
28 
29 //寫入(傳回)一個異常資訊給調用用戶端,用戶端會根據傳回結果處理異常
30 
31     oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);
32 
33     x.write(oprot);
34 
35     oprot->writeMessageEnd();
36 
37     oprot->getTransport()->writeEnd();
38 
39     oprot->getTransport()->flush();
40 
41     return true;
42 
43 }
44 
45 return process_fn(iprot, oprot, fname, seqid, callContext);//調用實際的函數處理
46 
47 }      
Thrift之TProcess類體系原理及源碼詳細解析

上面代碼有比較詳細的注釋,還需要說明一點的就是如果傳遞的不是函數調用的消息類型就會傳回給用戶端一個異常的消息,用戶端的接收傳回值的函數就會根據收到的異常消息做相應處理,上面getName函數的接收傳回值函數就是抛出一個伺服器端給的異常資訊。下面繼續看最終伺服器端調用相應映射函數的處理,這個是通過process_fn函數實作:具體定義如下:

Thrift之TProcess類體系原理及源碼詳細解析
1 bool FacebookServiceProcessor::process_fn(::apache::thrift::protocol::TProtocol* iprot,
 2 
 3 ::apache::thrift::protocol::TProtocol* oprot, std::string& fname, int32_t seqid, void* callContext) {
 4 
 5 //定義個map的疊代器,用于接收在函數映射查找到的映射函數
 6 
 7 std::map<std::string, void (FacebookServiceProcessor::*)(int32_t, ::apache::thrift::protocol::TProtocol*, 
 8 
 9 ::apache::thrift::protocol::TProtocol*, void*)>::iterator pfn;
10 
11   pfn = processMap_.find(fname);//根據函數名稱查找對應的映射處理函數
12 
13   if (pfn == processMap_.end()) {//如果沒有找到,做下面的處理
14 
15     iprot->skip(::apache::thrift::protocol::T_STRUCT);
16 
17     iprot->readMessageEnd();
18 
19     iprot->getTransport()->readEnd();
20 
21 //抛出一個不知道的方法的異常
22 
23     ::apache::thrift::TApplicationException x(::apache::thrift::TApplicationException::UNKNOWN_METHOD, 
24 
25 "Invalid method name: '"+fname+"'");
26 
27 //寫入到調用用戶端
28 
29     oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);
30 
31     x.write(oprot);
32 
33     oprot->writeMessageEnd();
34 
35     oprot->getTransport()->writeEnd();
36 
37     oprot->getTransport()->flush();
38 
39     return true;
40 
41   }
42 
43   (this->*(pfn->second))(seqid, iprot, oprot, callContext);//調用具體的函數(RPC過程完成)
44 
45   return true;
46 
47 }      
Thrift之TProcess類體系原理及源碼詳細解析

上面這個函數最終完成了RPC的過程,那個函數與映射函數的對應關系的map結構是在構造函數中初始化的,是以可以找到,例如我們舉例的getName函數是下面這樣初始化的:

1 processMap_["getName"] = &FacebookServiceProcessor::process_getName;      

和getName函數一樣,對于IDL定義的每一個函數在FacebookServiceProcessor類中都有一個映射的處理函數,為了展示一個完整的處理過程我們在看看getName函數的映射處理函數process_getName,它的定義如下:

Thrift之TProcess類體系原理及源碼詳細解析
1 void FacebookServiceProcessor::process_getName(int32_t seqid,
 2 
 3 ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext)
 4 
 5 {
 6 
 7 void* ctx = NULL;
 8 
 9 if (eventHandler_.get() != NULL) {
10 
11 //得到上下文調用環境
12 
13     ctx = eventHandler_->getContext("FacebookService.getName", callContext);
14 
15   }
16 
17 //定義并初始化一個用于釋放資源的幫助類對象
18 
19   ::apache::thrift::TProcessorContextFreer freer(eventHandler_.get(), ctx, "FacebookService.getName");
20 
21   if (eventHandler_.get() != NULL) {
22 
23     eventHandler_->preRead(ctx, "FacebookService.getName");//讀之前事件處理
24 
25   }
26 
27   FacebookService_getName_args args;
28 
29   args.read(iprot);
30 
31   iprot->readMessageEnd();
32 
33   uint32_t bytes = iprot->getTransport()->readEnd();
34 
35   if (eventHandler_.get() != NULL) {
36 
37     eventHandler_->postRead(ctx, "FacebookService.getName", bytes);//讀取和讀完之間的事件處理
38 
39   }
40 
41   FacebookService_getName_result result;
42 
43   try {
44 
45     iface_->getName(result.success);//這是重點:調用伺服器端的getName函數
46 
47     result.__isset.success = true;
48 
49   } catch (const std::exception& e) {
50 
51     if (eventHandler_.get() != NULL) {
52 
53       eventHandler_->handlerError(ctx, "FacebookService.getName");//錯誤處理
54 
55     }
56 
57 //寫入具體的異常到用戶端
58 
59     ::apache::thrift::TApplicationException x(e.what());
60 
61     oprot->writeMessageBegin("getName", ::apache::thrift::protocol::T_EXCEPTION, seqid);
62 
63     x.write(oprot);
64 
65     oprot->writeMessageEnd();
66 
67     oprot->getTransport()->writeEnd();
68 
69     oprot->getTransport()->flush();
70 
71     return;
72 
73   }
74 
75   if (eventHandler_.get() != NULL) {
76 
77     eventHandler_->preWrite(ctx, "FacebookService.getName");//寫入之前事件處理
78 
79   }
80 
81 //寫入調用傳回值(T_REPLY)消息到調用用戶端
82 
83   oprot->writeMessageBegin("getName", ::apache::thrift::protocol::T_REPLY, seqid);
84 
85   result.write(oprot);
86 
87   oprot->writeMessageEnd();
88 
89   bytes = oprot->getTransport()->writeEnd();
90 
91   oprot->getTransport()->flush();
92 
93   if (eventHandler_.get() != NULL) {
94 
95     eventHandler_->postWrite(ctx, "FacebookService.getName", bytes);//寫相應之後處理
96 
97   }
98 
99 }      
Thrift之TProcess類體系原理及源碼詳細解析

上面這個函數就是真正完成伺服器端調用用戶端傳遞過來的函數的處理過程,有事件處理類處理相應的事件(不過,目前都還是空實作,以後可以繼承這個處理類重寫需要處理事件的函數,例如:在調用伺服器真正的處理函數之前可以先處理一下參數,驗證參數是否正确之類的),也有幫助釋放資源的幫助類。

(2)scribe服務IDL檔案

Thrift之TProcess類體系原理及源碼詳細解析
1 include "/home/brucewoo/thrift-0.6.1/contrib/fb303/if/fb303.thrift"
 2 
 3 namespace cpp scribe.thrift
 4 
 5 namespace java scribe.thrift
 6 
 7 namespace perl Scribe.Thrift
 8 
 9 enum ResultCode
10 
11 {
12 
13   OK,
14 
15   TRY_LATER
16 
17 }
18 
19 struct LogEntry
20 
21 {
22 
23   1:  string category,
24 
25   2:  string message
26 
27 }
28 
29 service scribe extends fb303.FacebookService
30 
31 {
32 
33   ResultCode Log(1: list<LogEntry> messages);
34 
35 }      
Thrift之TProcess類體系原理及源碼詳細解析

這個IDL檔案隻定義了一個服務接口,就是用完成日志檔案傳輸的幾個Log,不過這個服務繼承FacebookService服務,是以上面介紹FacebookService服務的功能它也具備,傳輸日志的結構就是分類和具體的消息。這個服務的具體實作和上面介紹的FacebookService流程都是一樣的,不在詳細介紹,隻要知道一點就是:用戶端在調用Log寫日志到scribe伺服器的時候就會傳遞到伺服器端來調用同名的函數處理日志。

第三節 總結

TProcessor類體系主要定義一個服務生産的架構,通過這個架構生産的各種語言的代碼可以實作RPC調用,具體的傳輸細節、協定和方式是通過後面講解的内容實作的。

第二節對一個具體服務的實作内容做詳細分析,不過都是基于文字描述和代碼分析,下面根據scribe服務提供的Log函數怎樣完成一次具體的處理過程用下面的圖形展示:

Thrift之TProcess類體系原理及源碼詳細解析

這個圖形并沒有展示内部資料通信的細節,隻是簡單的說明了一個用戶端的調用是怎樣完成的,伺服器處理還涉及到很多相關細節,将在後面章節中詳細分析。