天天看點

【轉】Darwin Streaming Server 核心代碼分析

基本概念

首先,我針對的代碼是Darwin StreamingServer 6.0.3未經任何改動的版本。

DarwinStreaming Server從設計模式上看,采用了Reactor的并發伺服器設計模式,如果對Reactor有一定的了解會有助于對DarwinStreaming Server核心代碼的了解。

Reactor模式是典型的事件觸發模式,當有事件發生時則完成相應的Task,Task的完成是通過調用相應的handle來實作的,對于handle的調用是由有限個數的Thread來完成的。

DarwinStreaming Server中定義了一個Task類。Task類有兩個典型的方法,一個是Signal,一個是Run。調用Signal把一個Task加入到TaskThread的Task隊列中,等待完成,Run就是完成一個任務的handle。基于Task類,定義了三種類型的Task,分别是IdleTask,TimeoutTask,以及普通的Task。

在Darwin StreamingServer中,除了主線程以外,有三種類型的線程,分别是TaskThread,EventThread以及IdleTaskThread:

1.    TaskThread,TaskThread通過運作Task類型對象的Run方法來完成相應Task的處理。典型的Task類型是RTSPSession和RTPSession。TaskThread的個數是可配置的,預設情況下TaskThread的個數與處理器的個數一緻。等待被TaskThread調用并運作的Task放在隊列或者堆中。

2.    EventThread,EventThread負責偵聽套接口事件,在DarwinStreaming Server中,有兩種被偵聽的事件,分别是建立RTSP連接配接請求的到達和RTSP請求的到達。對于RTSP連接配接請求的事件,EventThread建立一個RTSPSession,并啟動針對相應的socket的偵聽。對于RTSP請求的事件,EventThread把對應的RTSPSession類型的Task加入到TaskThread的隊列中,等待RTSP請求被處理。

3.    IdleTaskThread,IdleTaskThread管理IdleTask類型對象的隊列,根據預先設定的定時器觸發IdleTask的排程。TCPListenerSocket就是一個IdleTask的派生類,當并發連接配接數達到設定的最大值時,會把派生自TCPListenerSocket的RTSPListenerSocket加入到IdleTaskThread管理的IdleTask隊列中,暫時停止對RTSP端口的偵聽,直到被設定好的定時器觸發。

核心架構

下圖是DarwinStreaming Server核心架構的示意圖。在這個示意圖中有三種類型的要素,分别是線程,Task隊列或者堆,被偵聽的事件。圖中的文字都是從源代碼中copy出來的,便于讀者通過查找與源代碼對應起來。

圖中給出了三個線程,分别是TaskThread::Entry,EventThread::Entry以及IdleTaskThread::Entry。前文已經對這三種線程進行了概要描述。

除了三個線程,圖中還有另外五個矩形塊。與TaskThread::Entry線程相關聯的有兩個,分别是TaskThread::fTaskQueue隊列和TaskThread::fHeap堆,通過調用Signal被排程等待完成的Task就放在隊列或者堆中。與IdleTaskThread::Entry線程相關聯的有一個,是IdleTaskThread::IdleHeap堆。與EventThread::Entry相關聯的是EventContext::fEventReq,是被偵聽的端口。還有一個是TimeoutTaskThread::fQueue隊列,它事實上是通過TimeoutTask與TaskThread::Entry相關聯。

圖中指向線程的連接配接線表明從隊列或者堆中取出Task,而對于EventThread::Entry線程來說,則是被偵聽事件的發生。指向被偵聽的端口的連接配接線表明把端口加入偵聽,指向Task的隊列或堆的連接配接線,表明把Task加入到隊列或者堆中。連接配接線的文字給出的是相應的函數調用,可以直接在源代碼中搜尋到。

EventThread

系統啟動的時候調用QTSServer::StartTasks()把RTSP服務端口加入到偵聽隊列中。此時便開始接收用戶端的RTSP連接配接請求了。

在EventThread::Entry中調用select_waitevent函數等待事件的發生,當有事件發生的時候,就通過調用ProcessEvent方法對事件進行相應的處理。注意ProcessEvent是一個虛函數,共有兩個實作。EventContext類中實作了ProcessEvent方法,EventContext的派生類TCPListenerSocket中也實作了ProcessEvent方法。

對于建立RTSP連接配接的請求,調用TCPListenerSocket::ProcessEvent方法來處理,此方法調用RTSPListenerSocket的GetSessionTask方法建立一個RTSPSession,并把相應的套接口加入偵聽隊列,等待RTSP請求。然後還需調用this->RequestEvent(EV_RE)把建立RTSP連接配接的請求加入到偵聽隊列。

對于RTSP連接配接上的RTSP請求事件,調用的是EventContext::ProcessEvent方法,通過Task的Signal把對應的RTSPSession類型的Task加入到TaskThread::fTaskQueue中等待TaskThread處理。

TaskThread與Task

TaskThread::Entry調用TaskThread::WaitForTask()方法獲得下一個需要處理的Task。TaskThread::WaitForTask()首先從TaskThread::fHeap中獲得Task,如果TaskThread::fHeap中沒有滿足條件的Task,則從TaskThread::fTaskQueue中獲得Task。

TaskThread::Entry調用Task::Run方法來完成對應的Task,Task::Run方法的傳回值類型是SInt64,也即signedlong long int類型。TaskThread::Entry根據Task::Run方法的傳回值進行不同的處理。對于小于0的傳回值,需delete這個Task;對于大于0的傳回值,傳回值代表了下次處理這個Task需等待的時間,TaskThread::Entry調用fHeap.Insert(&theTask->fTimerHeapElem)把Task插入到堆裡,并設定等待時間。對于等于0的傳回值,TaskThread::Entry不再理會該Task。

TimeoutTask

從代碼中看,TimeoutTaskThread是IdleTask的派生類,分析後發現從TimeoutTaskThread與IdleTask沒有任何關系,完全可以從Task派生,修改代碼後驗證了這個想法。是以TimeoutTaskThread就是一個普通的Task,TimeoutTaskThread通過其Run方法監控一組逾時任務,具體的比如RTSP協定或者RTP協定逾時。

在系統啟動的時候TimeoutTaskThread被加入到TaskThread的隊列中,這是通過在StartServer函數中調用TimeoutTask::Initialize()來實作的。TimeoutTaskThread::Run函數的傳回值是intervalMilli= kIntervalSeconds * 1000,也就是一個正數,于是TimeoutTaskThread這個Task會加入到TaskThread::fHeap中被周期性的調用。

TimeoutTaskThread::Run方法發現有逾時的任務,則通過Signal方法排程這個Task,event為Task::kTimeoutEvent。被管理的這組任務,要有RefreshTimeout的機制。

一次點播請求的處理

為了更好的了解DarwinStreaming Server的架構,我們從用戶端發起點播,觸發伺服器的建立RTSP連接配接事件的發生開始,看看DSS的工作流程是什麼樣的。

針對RTSP協定,DarwingStreaming Server在554端口上偵聽,當有連接配接請求到達時,通過accept調用傳回一個socket,對應的後續RTSP請求都是通過這個socket來傳送的。我們把RTSP相關的事件分成兩類,一類是RTSP連接配接請求,一類是RTSP請求。先來看RTSP連接配接請求的過程:

1.    RTSP連接配接請求到達後,被select_waitevent函數捕獲,代碼在EventContext.cpp的EventThread::Entry中232行。

2.    查找EventThread::fRefTable,擷取對應的EventContext。得到的是EventContext類型的派生類RTSPListenerSocket。相應的代碼在EventContext.cpp中的249到253行。

3.    調用ProcessEvent,處理事件。相應的代碼在EventContext.cpp中的257行。注意,由于對應的EventContext類其實是RTSPListenerSocket,是以調用的應該是TCPListenerSocket::ProcessEvent。

4.    在TCPListenerSocket.cpp的106行TCPListenerSocket::ProcessEvent方法中,調用accept得到socket,在160行調用了GetSessionTask方法,對應的是RTSPListenerSocket::GetSessionTask,在QTSServer.cpp中定義。

5.    在RTSPListenerSocket::GetSessionTask方法中,QTSServer.cpp的1077行,調用NEWRTSPSession建立了一個新的RTSPSession。

6.    回到TCPListenerSocket.cpp檔案中的TCPListenerSocket::ProcessEvent方法,注意189行,把剛剛建立好的RTSP連接配接加入到偵聽隊列,等待RTSP請求的到來。

RTSP請求的處理流程步驟如下,注意前面第一步是一樣的:

2.    查找EventThread::fRefTable,擷取對應的EventContext。得到的是EventContext類。相應的代碼在EventContext.cpp中的249到253行。

3.    調用ProcessEvent,處理事件。相應的代碼在EventContext.cpp中的257行。注意,此時調用的是EventContext::ProcessEvent。

4.    EventContext::ProcessEvent方法在EventContext.h中實作,在127行。在138行調用了fTask->Signal(Task::kReadEvent),fTask就是RTSPSession類。把RTSPSession加入到TaskThread的隊列等待RTSPSession::Run()被調用。

5.    後續就是RTSPSession::Run()對RTSP請求的具體的處理。

(全文完)

繼續閱讀