摘要 所謂的流式媒體簡單的講就是指人們通過網絡實時的收看多媒體資訊:如音頻流、視訊流等。與流式媒體對應的傳統工作方式是下載下傳+播放模式,即使用者首先下載下傳多媒體檔案,然後再在本地播放,這種方法的一個主要缺點是啟動延遲較大,例如一個30分鐘長的MPEG-I檔案(相當于VCD品質),即使使用1.5Mbps的速率下載下傳,也需要半個小時才能完成,這樣一個漫長的等待時間實在是無法忍受。在窄帶網絡環境中,幾乎所有基于Internet的流式媒體産品都有着類似的工作原理:首先需要開發高效的壓縮編碼技術,并通過一套完整有效的傳輸體系将其釋出到使用者的桌面上。目前在流式媒體領域,有三種占有主導地位的産品,它們分别是App...
2 Darwin流化伺服器介紹
DSS源代碼完全采用标準C++語言寫成,程式設計風格非常優秀,每個C++類都對應着一對和類同名的.h/.cpp檔案。但是由于大量采用了面向對象的概念,如繼承、多态等等;而且源檔案和類相當多,是以不太容易講清楚。是以,讀者最好事先把代碼完整的過濾一兩遍,再配合本文,就能看得更清楚點。
其中,最為重要的是基礎功能類庫(CommonUtilitiesLib)和流化伺服器(StreamingServer)兩個工程,前者是整個系統的通用代碼工具箱,包括了線程管理、資料結構、網絡和文本分析等多個功能子產品。DSS和其他相關的工具使用基礎功能類庫工程中定義的功能類實作以下三個目标:
(1)抽象出系統中相同或類似的功能,用于降低代碼的備援度;
(2)封裝基本功能,簡化高層編碼的複雜度;
(3)隔離開作業系統平台相關的代碼。
而流化伺服器工程中包含了DSS對多個國際标準的實作,是整個伺服器的主工程。在本文中,我們将重點分析這兩個工程中的核心代碼和子產品。另外,我們還将簡單介紹利用DSS提供的開發接口(Module)擴充和定制伺服器的方法。
DSS實作了四種IETF制定的國際标準,分别是:實時流傳輸協定RTSP(Real-time Streaming Protocol, RFC 2326)、實時傳輸協定(RTP Real-time Transfer Protocol,RFC 1889)、實時傳輸控制協定RTCP(Real-time Transport Control Protocol,RFC 1889)、會話描述協定SDP(Session Description Protocol,RFC 2327)。這四個标準是開發所有流式媒體産品都必須掌握的,是以在對相關代碼進行分析和二次開發之前,希望讀者了解上述四種協定的基本思想,上述協定樣本可從以下網站獲得:http://www.ietf.org
3 基礎功能類庫(Common Utilities)
3.2 Tasks類
因為伺服器從整體上采用了異步的運作模式,這就需要一種用于事件通信的機制。舉例來說:一個RTSP連接配接對應的Socket端口監測到網絡上有資料到達,此時必須有一個子產品(或代碼)被通知(notify)去處理這些資料。為此,DSS定義了Task及其相關類作為實作這一通信機制的核心。
在Task.h/cpp檔案中,定義了三個主要的類,分别是:任務線程池類(TaskThreadPool Class)、任務線程類(TaskThread Class)以及任務類(Task Class)。
每個Task對象有兩個主要的方法:Signal和Run。當伺服器希望發送一個事件給某個Task對象時,就會調用Signal()方法;而Run()方法是在Task對象獲得處理該事件的時間片後運作的,伺服器中的大部分工作都是在不同Task對象的Run()函數中進行的。每個Task對象的目标就是利用很小的且不會阻塞的時間片完成伺服器指定某個工作。
任務線程類是上文介紹的OSThread類的一個子類,代表專門用于運作任務類的一個線程。在每個任務線程對象内部都有一個OSQueue_Blocking類型的任務隊列,存儲該線程需要執行的任務。後面的分析可以看到,伺服器調用一個任務的Signal函數,實際上就是将該任務加入到某個任務線程類的任務隊列中去。另外,為了統一管理這些任務線程,DSS還開發了任務線程池類,該類負責生成、删除以及維護内部的任務線程清單。圖4描述了任務類的運作。
下面我們首先分析TashThread類,該類的定義如下:
class TaskThread : public OSThread //OSThread的子類
{ //提示:所有的Task對象都将在TaskThread中運作
1 public:
2 TaskThread() : OSThread(), fTaskThreadPoolElem(this){} //構造函數
3 virtual ~TaskThread() { this->StopAndWaitForThread(); } //析構函數
4 private:
…
5 virtual void Entry(); //從OSThread重載的執行函數,仍然能夠被子類重載
6 Task* WaitForTask(); //檢測是否有該執行的任務
7 OSQueueElem fTaskThreadPoolElem; //對應的線程池對象
8 OSHeap fHeap; //紀錄任務運作時間的堆,用于WaitForTask函數
/*關鍵資料結構:任務隊列;在Task的Signal函數中直接調用fTaskQueue對象的EnQueue函數将自己加入任務隊列*/
9 OSQueue_Blocking fTaskQueue;
//此處略…
}
作為OSThread的子類,TaskThread重載了Entry函數,一旦TaskThread的對象被執行個體化,便運作該函數。Entry()函數的主要任務就是調用WaitForTask()函數監測任務隊列,如果發現新任務,就在規定時間内執行;否則,就被阻塞。下面我們簡要分析Entry()函數的流程:
void TaskThread::Entry()
{
1 Task* theTask = NULL; //空任務
2 while (true) //線程循環執行
3 { //監測是否有需要執行的任務,如果有就傳回該任務;否則阻塞;
4 theTask = this->WaitForTask();
5 Assert(theTask != NULL);
6 Bool16 doneProcessingEvent = false; //尚未處理事件
7 while (!doneProcessingEvent)
8 {
9 theTask->fUseThisThread = NULL; // 對任務的排程獨立于線程
10 SInt64 theTimeout = 0; //Task中Run函數的傳回值,重要
//核心部分:運作任務,根據傳回值判斷任務進度
11 if (theTask->fWriteLock)
12 { //如果任務中有寫鎖,需要使用寫互斥量,否則可能造成死鎖
13 OSMutexWriteLocker mutexLocker(&TaskThreadPool::sMutexRW);
14 theTimeout = theTask->Run(); //運作任務,得到傳回值
15 theTask->fWriteLock = false;
16 }
17 else
18 { //使用讀互斥量
19 OSMutexReadLocker mutexLocker(&TaskThreadPool::sMutexRW);
20 theTimeout = theTask->Run(); //運作任務,得到傳回值
21 }
22 //監測Task中Run()函數的傳回值,共有三種情況
23 //1、傳回負數,表明任務已經完全結束
24 if (theTimeout 25 {
26 delete theTask; //删除Task對象
27 theTask = NULL;
28 doneProcessingEvent = true;
19 }
30 //2、傳回0,表明任務希望在下次傳信時被再次立即執行
31 else if (theTimeout=0)
32 {
33 doneProcessingEvent = compare_and_store(Task::kAlive, 0, &theTask->fEvents);
34 if (doneProcessingEvent)
35 theTask = NULL;
36 }
//3、傳回正數,表明任務希望在等待theTimeout時間後再次執行
37 else
38 {
/*将該任務加入到Heap中,并且紀錄它希望等待的時間。Entry()函數将通過waitfortask()函數進行檢測,如果等待的時間到了,就再次運作該任務*/
39 theTask->fTimerHeapElem.SetValue(OS::Milliseconds() + theTimeout);
40 fHeap.Insert(&theTask->fTimerHeapElem);
41 (void)atomic_or(&theTask->fEvents, Task::kIdleEvent);//設定Idle事件
42 doneProcessingEvent = true;
43 }
注意,如果Task的Run()函數傳回值TimeOut為正數,意味着該任務是一個周期性的工作,例如發送資料的視訊泵(pump),需要每隔一定時間就發出一定量的視訊資料,直至整個節目結束。為此,在第38~43行,将該任務加入到堆fHeap中去,并且标記該任務下次運作的時間為TimeOut毫秒之後。将來通過調用WaitForTask()函數就能檢測到該任務是否到達規定的運作時間,WaitForTask()函數的代碼如下:
Task* TaskThread::WaitForTask()
1 while (true)
2 { //得到目前時間,該函數為靜态函數,定義見OS.h
3 SInt64 theCurrentTime = OS::Milliseconds();
/*如果堆中有任務,且任務已經到執行時間,傳回該任務。 PeekMin函數見OSHeap.h,竊聽堆中第一個元素(但不取出)*/
4 if ((fHeap.PeekMin() != NULL) && (fHeap.PeekMin()->GetValue() 從堆中取出第一個任務傳回
5 return (Task*)fHeap.ExtractMin()->GetEnclosingObject();
//如果堆中有任務,但是尚未到執行時間,計算需要等待的時間
6 SInt32 theTimeout = 0;
7 if (fHeap.PeekMin() != NULL) //計算還需等待的時間
8 theTimeout = fHeap.PeekMin()->GetValue() - theCurrentTime;
9 Assert(theTimeout >= 0);
//等待theTimeout時間後從堆中取出任務傳回
10 OSQueueElem* theElem = fTaskQueue.DeQueueBlocking(this, theTimeout);
11 if (theElem != NULL)
12 return (Task*)theElem->GetEnclosingObject();
13 }
}
void Task::Signal(EventFlags events)
// fUseThisThread用于指定該任務運作的任務線程
1 if (fUseThisThread != NULL) //存在指定任務線程
//将該任務加入到指定任務線程的任務隊列中
2 fUseThisThread->fTaskQueue.EnQueue(&fTaskQueueElem);
//不存在指定的任務線程,随機選擇一個任務線程運作該任務
3 else
4 {
//從線程池中随機選擇一個任務線程
5 unsigned int theThread = atomic_add(&sThreadPicker, 1);
6 theThread %= TaskThreadPool::sNumTaskThreads;
//将該任務加入到上面選擇的任務線程的任務隊列中
7 TaskThreadPool::sTaskThreadArray[theThread]-> fTaskQueue.EnQueue (&fTaskQueueElem);
8 }
至此我們已經将DSS的線程和任務運作機制分析完了,這種由事件去觸發任務的概念已經被內建到了DSS的各個子系統中。例如,在DSS中經常将一個Task對象和一個Socket對象關聯在一起,當Socket對象收到事件(通過select()函數),相對應的Task對象就會被傳信(通過Signal()函數);而包含着處理代碼的Run()函數就将在某個任務線程中運作。
是以,通過使用這些Task對象,我們就可以讓所有連接配接都使用一個線程來處理,這也是DSS的預設配置方法。
3.3 Socket類
作為一個典型的網絡伺服器,DSS源代碼中的Socket程式設計部分是其精華之一。DSS定義了一系列Socket類用于屏蔽不同平台在TCP/UDP程式設計接口和使用方法上的差異。DSS中的Socket類一般都采用異步模式的(即非阻塞的),而且能夠向對應的Task對象傳信(Signal),這點我們在上一節介紹過。Socket類中具有代表性的類是:EventContext、EventThread、Socket、UDPSocket、TCPSocket以及TCPListenerSocket等等,它們之間的繼承關系見圖5。
在eventcontext.h/.cpp檔案中,定義了兩個類:EventContext類和EventThread類。 Event Context提供了檢測Unix式的檔案描述符(Socket就是一種檔案描述符)産生的事件(通常是EV_RE 或 EV_WR)的能力,同時還可以傳信指定的任務。EventThread類是OSThread類的子類,它本身很簡單,隻是重載了OSThread的純虛函數Entry(),用以監控所有的Socket端口是否有資料到來,其代碼分析如下:
void EventThread::Entry()
/*該結構定義在ev.h中,記錄Socket描述符和在該描述符上發生的事件*/
1 struct eventreq theCurrentEvent;
2 ::memset( &theCurrentEvent, '\0', sizeof(theCurrentEvent) ); //初始化該結構
3 while (true)
4 {
//首先監聽Socket端口的事件
5 int theErrno = EINTR;
6 while (theErrno=EINTR)
7 {
8 #if MACOSXEVENTQUEUE //Macos平台
9 int theReturnValue = waitevent(&theCurrentEvent, NULL);
10 #else //其他平台
/*調用select_waitevent函數監聽所有的Socket端口,直到有事件發生為止*/
11 int theReturnValue = select_waitevent(&theCurrentEvent, NULL);
12 #endif
//有事件發生,喚醒相應的Socket端口
13 if (theCurrentEvent.er_data != NULL)
14 {
//通過事件中的辨別找到相應的對象參考指針
15 StrPtrLen idStr((char*)&theCurrentEvent.er_data, sizeof(theCurrentEvent.er_data));
16 OSRef* ref = fRefTable.Resolve(&idStr);
17 if (ref != NULL)
18 { //通過參考指針得到EventContext對象
19 EventContext* theContext = (EventContext*)ref->GetObject();
//利用EventContext對象的ProcessEvent方法傳信對應的Task
20 theContext->ProcessEvent(theCurrentEvent.er_eventbits);
21 fRefTable.Release(ref); //減少引用計數
22 }
//此處略…
上述代碼有兩點需要注意:首先在第11行,調用select_waitevent函數監聽所有Socket端口的事件。該函數在Windows平台上是采用WSAAsyncSelect(異步選擇)模型實作的。具體實作是:系統首先建立一個視窗類,該類專門用于接受消息;在每個Socket端口建立後,調用WSAsyncSelect函數,同時将上述視窗類的句柄作為參數傳入;将來這些Socket端口有事件發生時,Windows就會自動将這些事件映射為标準的Windows消息發送給視窗類,此時select_waitevent函數通過檢查消息就能夠獲得對應Socket端口發生的事件。對于Windows平台下Socket的異步程式設計技術細節請參閱《Windows網絡程式設計技術》一書。
另外,在第20行調用的EventContext對象的ProcessEvent函數實作上很簡單,隻有一行代碼:fTask->Signal(Task::kReadEvent);其中fTask為該EventContext對象對應的Task對象;ProcessEvent函數向Task對象傳信,以便及時處理剛剛發生的Socket事件。
與EventThread對應的EventContext對象負責維護指定的描述符,其主要函數包括InitNonBlocking、CleanUp和RequestEvent等。其中InitNonBlocking函數調用Socket API ioctlsocket将使用者指定的描述符設定為異步,CleanUp函數用于關閉該描述符;另外,使用者通過RequestEvent函數申請對該描述符中某些事件的監聽,如前所述,該函數内部調用了WSAsyncSelect來實作這一功能。
Socket Class、UDPSocket Class和TCPSocketClass三個類都是EventContext的子類,它們封裝了TCP和UDP的部分實作,同時擴充了EventContext中的事件,但都沒有改變其運作機制,是以此處不再詳述,留給讀者自行分析。我們要為大家分析的是另外一個比較複雜的Socket類TCPListenerSocket類。TCPListenerSocket用于監聽TCP端口,當一個新連接配接請求到達後,該類将賦予這個新連接配接一個Socket對象和一個Task對象的配對。首先分析TCPListenerSocket類的主要定義如下:
class TCPListenerSocket : public TCPSocket, public IdleTask
/*提示:該類從有兩個基類,是以它既是一個事件監聽者,同時也是一個任務Task。作為一個任務,給TCPListenerObject發送Kill事件就可以删除它*/
2 TCPListenerSocket() : TCPSocket(NULL, Socket::kNonBlockingSocketType), IdleTask(), fAddr(0), fPort(0), fOutOfDescriptors(false) {} //構造函數
3 virtual ~TCPListenerSocket() {} //析構函數
//addr為位址,port為端口号,初始化函數自動監聽TCP端口
4 OS_Error Initialize(UInt32 addr, UInt16 port);
//子類必須重載該純虛函數,用于建立新連接配接時生成任務對象
5 virtual Task* GetSessionTask(TCPSocket** outSocket) = 0;
6 virtual SInt64 Run(); //重載Task的Run函數,子類仍可重載
7 private:
//重載EventContext的ProcessEvent函數,用于産生Socket和Task對象配對
8 virtual void ProcessEvent(int eventBits);
9 OS_Error Listen(UInt32 queueLength);
//其他略…
前面我們分析得知,EventContext類通過ProcessEvent函數來實作對任務的傳信工作,但在TCPListenerSocket 中,ProcessEvent函數被重載用來建立Socket和Task對象得配對,該函數的實作如下:
void TCPListenerSocket::ProcessEvent(int /*eventBits*/)
{ /*提示:該函數運作于系統唯一的EventThread線程中,是以要盡量快速,以免占用過多的系統資源*/
//此處略去部分定義…
1 Task* theTask = NULL; //Task對象
2 TCPSocket* theSocket = NULL; //Socket對象
//建立對象配對
4 { //accept連接配接
5 int osSocket = accept(fFileDesc, (struct sockaddr*)&addr, &size);
6 if (osSocket == -1) //監聽端口出錯
7 { //此處略去出錯處理 }
//用子類重載的GetSessionTask函數建立Task對象
8 if ((theTask = this->GetSessionTask(&theSocket))=NULL) //建立出錯
9 close(osSocket);
10 else //建立成功,接着建立Socket對象
11 {
12 Assert(osSocket != EventContext::kInvalidFileDesc);
//此處略去部分對建立連接配接端口的設定(setsockopt函數)
//建立新的Socket對象
13 theSocket->Set(osSocket, &addr);
14 theSocket->InitNonBlocking(osSocket); //初始化
15 theSocket->SetTask(theTask); //設定對應的任務
16 theSocket->RequestEvent(EV_RE); //新對象監聽讀事件
17 }
18 }
//處理完一次連接配接請求後,TCPListenerSocket對象還要接着監聽
19 this->RequestEvent(EV_RE);
對Socket類的分析基本完成了,從中我們可以發現,DSS對于網絡傳信和任務排程之間的處理非常精密,環環相扣,在某種程度上甚至是有些過a于花哨。但是這些基本類是上層RTSP/RTP等伺服器子系統編碼的基礎,是以希望讀者能夠從本質上掌握這些代碼。
4 核心功能庫(Server Core)
4.1 RTSP 子系統
RTSP标準是實時流控制協定(Real-Time Streaming Protocol RFC2326)的簡稱,它被客戶和流式媒體伺服器用來交換對媒體的控制資訊。圖6是RTSP基本操作的描述。
再給出一個RTSP協定的例子如下:
DSS開發了一個RTSP子系統來支援标準的RTSP協定,本節将分析這些源代碼。
首先,DSS定義了一個TCPListenerSocket類的子類RTSPListenerSocket,用于監聽RTSP連接配接請求。RTSPListenerSocket類做的唯一一件事就是重載了GetSessionTask函數,當客戶的連接配接請求到達後,它建立了一個Socket對象和RTSPSession對象的配對。RTSPSession對象是Task類的子類,是專門用于處理RTSP請求的任務類。
如圖7所示,RTSP連接配接建立後,伺服器會為每個客戶維護一個Socket對象和RTSPSession對象的配對;當客戶的RTSP請求到達時,Socket對象就會調用RTSPSession對象的Signal方法傳信,即将RTSPSession對象加入到TaskThread對象的任務隊列中去;而當時間片到來,TaskThread線程就會調用RTSPSession對象的Run方法,這個方法就會處理客戶發送過來的RTSP請求。是以,下面我們将主要分析RTSPSession的Run方法。
為了跟蹤目前處理的情況,RTSPSession類内部定義了多個狀态,而Run方法其實就是通過在這些狀态之間不斷切換,同時對客戶的RTSP請求做出不同的處理。
enum
{
//RTSPSession的基本狀态
kReadingRequest= 0,
kFilteringRequest= 1,
kRoutingRequest= 2,
kAuthenticatingRequest= 3,
kPreprocessingRequest= 4,
kProcessingRequest= 5,
kSendingResponse= 6,
kPostProcessingRequest = 7,
kCleaningUp= 8,
//當RTSP協定通過HTTP隧道實作時将用到下面的狀态
kWaitingToBindHTTPTunnel = 9,
kSocketHasBeenBoundIntoHTTPTunnel = 10,
kHTTPFilteringRequest = 11,
kReadingFirstRequest = 12,
kHaveNonTunnelMessage = 13
}
另外,值得注意的是,DSS提供一種稱為Module的二次開發模式,開發人員可以編寫新的Module并且注冊其希望運作的狀态,系統就會在相應的狀态下調用該Module,進而将控制權暫時交給二次開發的代碼,以便增強系統的功能。簡單起見,下面我們将分析不存在客戶子產品的Run()函數源代碼。首先分析其主架構如下:
SInt64 RTSPSession::Run()
1 EventFlags events = this->GetEvents(); //取出事件
2 QTSS_Error err = QTSS_NoErr;
3 QTSSModule* theModule = NULL;
4 UInt32 numModules = 0;
// 設定目前的Module狀态
5 OSThread::GetCurrent()->SetThreadData(&fModuleState);
//檢查該連接配接是否逾時,如果是就設定狀态斷掉該連接配接
6 if ((events & Task::kTimeoutEvent) || (events & Task::kKillEvent))
7 fLiveSession = false;
8 while (this->IsLiveSession()) //如果連接配接尚未拆除,執行狀态機
9 {
/* 提示:下面是RTSPSession的狀态機。因為在處理RTSP請求過程中,有多個地方需要Run方法傳回以便繼續監聽新的事件。為此,我們需要跟蹤目前的運作狀态,以便在被打斷後還能回到原狀态*/
10 switch (fState)
11 {
12 case 狀态1: //處理略
13 case 狀态2: //處理略…
14 case 狀态n: //處理略
15 } //此處略…
Run函數的主架構比較簡單,其核心就在于10~15的狀态機,是以我們希望按照客戶請求到達并且被處理的主要流程為讀者描述該狀态機的運轉。
1第一次請求到達進入kReadingFirstRequest狀态,該狀态主要負責從RTSPRequestStream類的對象fInputStream中讀出客戶的RTSP請求,其處理如下:
case kReadingFirstRequest:
{
1 if ((err = fInputStream.ReadRequest())=QTSS_NoErr)
2 {/* RequestStream傳回QTSS_NoErr意味着所有資料已經從Socket中讀出,但尚不能構成一個完整的請求,是以必須等待更多的資料到達*/
3 fInputSocketP->RequestEvent(EV_RE); //接着請求監聽讀事件
4 return 0; //Run函數傳回,等待下一個事件發生
5 }
6 if ((err != QTSS_RequestArrived) && (err != E2BIG))
7 {//出錯,停止處理
8 Assert(err > 0);
9 Assert(!this->IsLiveSession());
10 break;
11 }
//請求已經完全到達,轉入kHTTPFilteringRequest狀态
12 if (err = QTSS_RequestArrived)
13 fState = kHTTPFilteringRequest;
//接收緩沖區溢出,轉入kHaveNonTunnelMessage狀态
14 if (err=E2BIG)
15 fState = kHaveNonTunnelMessage;
continue;
2正常情況下,在獲得一個完整的RTSP請求後(上第12行),系統将進入kHTTPFilteringRequest狀态該狀态檢查RTSP連接配接是否需要經過HTTP代理實作;如不需要,轉入kHaveNonTunnelMessage狀态。
3進入kHaveNonTunnelMessage狀态後,系統建立了RTSPRequest類的對象fRequest,該對象解析客戶的RTSP請求,并儲存各種屬性。fRequest對象被傳遞給其他狀态處理。
4接着進入kFilteringRequest狀态,二次開發人員可以通過編寫Module對客戶的請求做出特殊處理。如果客戶的請求為正常的RTSP請求,系統調用SetupRequest函數建立用于管理資料傳輸的RTPSession類對象,其源代碼分析如下:
void RTSPSession::SetupRequest()
// 首先分析RTSP請求,細節見RTSPRequest.h/.cpp
1 QTSS_Error theErr = fRequest->Parse();
2 if (theErr != QTSS_NoErr)
3 return;
//OPTIONS請求,簡單發回标準OPTIONS響應即可
4 if (fRequest->GetMethod() = qtssOptionsMethod)
5 {//此處略去部分處理代碼…
6 }
//DESCRIBE請求,必須保證已經有了SessionID
7 if (fRequest->GetMethod() = qtssDescribeMethod)
8 {
9 if (fRequest->GetHeaderDictionary()->GetValue(qtssSessionHeader)->Len > 0)
10 {
11 (void)QTSSModuleUtils::SendErrorResponse(fRequest, qtssClientHeaderFieldNotValid, qtssMsgNoSesIDOnDescribe);
12 return;
13 }
14 }
//查找該請求的RTPSession
15 OSRefTable* theMap = QTSServerInterface::GetServer()->GetRTPSessionMap();
16 theErr = this->FindRTPSession(theMap);
17 if (theErr != QTSS_NoErr)
18 return;
//如果未查找到,建立一個新的RTPSession
19 if (fRTPSession= NULL)
20 {
21 theErr = this->CreateNewRTPSession(theMap);
22 if (theErr != QTSS_NoErr)
23 return;
24 }
5進入kRoutingRequest狀态,調用二次開發人員加入的Module,用于将該請求路由(Routing)出去。預設情況下,系統本身對此狀态不做處理。
6進入kAuthenticatingRequest狀态,調用二次開發人員加入的安全子產品,主要用于客戶身份驗證以及其他如規則的處理。讀者如果希望開發具有商業用途的流式媒體伺服器,該子產品必須進行二次開發。
7進入kPreprocessingRequest和kProcessingRequest及kPostProcessingRequest狀态,這三種狀态都是通過調用系統自帶或二次開發人員添加的Module來處理RTSP請求,例如系統提供了QTSSReflector Module、QTSSSplitter Module以及QTSSFile Module等子產品。其中比較重要的QTSSFile Module屬于QTLib庫的部分,此處不再詳述。
8進入kSendingResponse狀态,用于發送對客戶RTSP請求處理完成之後的響應。系統在該狀态調用了fOutputStream.Flush()函數将在fOutputStream中尚未發出的請求響應通過Socket端口完全發送出去。
9進入kCleaningUp狀态,清除所有上次處理的資料,并将狀态設定為kReadingRequest等待下次請求到達。
RTSPSession的主流程分析完了,但輔助其操作的多個RTSP類還需要讀者自行分析,它們分别是:RTSPSessionInterface Class、RTSPRequest Class、RTSPRequestInterface Class、RTSPRequestStream Class以及RTSPResponseStream Class等等。
4.2 RTP子系統
RTP标準是實時傳輸協定(Real-Time Transfer Protocol)的簡稱,它被客戶和流式媒體伺服器用來處理流式媒體資料的傳輸。在介紹RTSP的運作流程時,我們發現RTSPSession對象通過調用SetupRequest函數為客戶建立RTPSession對象。RTPSession類是Task類的子類,是以它重載了Task類的Run函數,該函數通過調用FileModule.cpp檔案中的SendPacket()函數向客戶發送RTP協定打包的流式媒體資料。當客戶通過利用RTSP向RTSPSession對象發出PLAY指令後,RTSPSession對象将調用RTPSession對象的Play()函數。Play函數準備好需要打包發送的資料後,利用Task類的Signal函數傳信RTPSession對象,使其被加入某個TaskThread的任務隊列,進而運作其Run函數。
另外,對同一個節目中的每一個獨立的RTP流(如音頻流或視訊流等),DSS都定義了一個RTPStream類與之對應;顯然一個RTPSession對象可能包含多個RTPStream對象。整個RTP子系統的核心運作流程見圖8。
下面,我們首先分析RTPSession中Run()函數的用法:
SInt64 RTPSession::Run()
{ //提示:該函數代碼在TaskThread内運作
1 EventFlags events = this->GetEvents(); //取出事件
2 QTSS_RoleParams theParams;
//提供給其他Module運作的參數,第一個成員是對象本身
3 theParams.clientSessionClosingParams.inClientSession = this;
//設定自己為目前運作的線程
4 OSThread::GetCurrent()->SetThreadData(&fModuleState);
/*如果事件是通知RTPSession對象死亡,就準備自殺。可能導緻這種情況的有兩種事件:自殺kKillEvent;逾時kTimeoutEvent*/
5 if ((events & Task::kKillEvent) || (events & Task::kTimeoutEvent) || (fModuleDoingAsyncStuff))
6 { //處理對象自殺代碼,此處略…
7 return –1; //傳回出錯資訊,這樣析構函數就會被調用,進而讓對象完全死亡
8 }
//如果正處于暫停(PAUSE)狀态,什麼都不做就傳回,等待PLAY指令
9 if ((fState == qtssPausedState) || (fModule == NULL))
10 return 0;
//下面代碼負責發送資料
11 { //對Session互斥量加鎖,防止發送資料過程中RTSP請求到來
12 OSMutexLocker locker(&fSessionMutex);
//設定資料包發送時間,防止被提前發送
13 theParams.rtpSendPacketsParams.inCurrentTime = OS::Milliseconds();
14 if (fPlayTime > theParams.rtpSendPacketsParams.inCurrentTime) //未到發送時間
15 theParams.rtpSendPacketsParams.outNextPacketTime=fPlayTime- theParams.rtpSendPacketsParams.inCurrentTime; //計算還需多長時間才可運作
16 else
17 { //下次運作時間的缺預設值為0
18 theParams.rtpSendPacketsParams.outNextPacketTime = 0;
// 設定Module狀态
19 fModuleState.eventRequested = false;
20 Assert(fModule != NULL);
//調用QTSS_RTPSendPackets_Role内的函數發送資料,見FileModule.cpp
21 (void)fModule->CallDispatch(QTSS_RTPSendPackets_Role, &theParams);
//将傳回值從負數改為0,否則任務對象就會被TaskThread删除
22 if (theParams.rtpSendPacketsParams.outNextPacketTime 23 theParams.rtpSendPacketsParams.outNextPacketTime = 0;
24 }
25 }
//傳回下一次希望被運作的時間;傳回值含義見前文的分析
26 return theParams.rtpSendPacketsParams.outNextPacketTime;
從上面分析可見,正常狀态下Run函數的傳回值有兩種:如果傳回值為正數,代表下一次發送資料包的時間,規定時間到來的時候,TaskThread線程會自動調用Run函數;如果傳回值等于0,在下次任何事件發生時,Run函數就會被調用,這種情況往往發生在所有資料都已經發送完成或者該RTPSession對象将要被殺死的時候。
在第21行我們看到,Run函數調用了QTSSFileModule中的QTSS_RTPSendPackets_Role發送資料。在QTSSFileModule.cpp檔案的QTSSFileModule_Main函數内,系統又調用了SendPackets函數,這才是真正發送RTP資料包的函數,我們對其代碼分析如下:
QTSS_Error SendPackets(QTSS_RTPSendPackets_Params* inParams)
//得到要發送資料的FileSession對象,其定義見QTSSFileModule.cpp檔案
1 FileSession** theFile = NULL;
2 UInt32 theLen = 0;
3 QTSS_Error theErr = QTSS_GetValuePtr(inParams->inClientSession, sFileSessionAttr, 0, (void**)&theFile, &theLen);
4 if ((theErr != QTSS_NoErr) || (theLen != sizeof(FileSession*))) //出錯
5 { //設定出錯原因,然後斷掉連接配接,并傳回
6 QTSS_CliSesTeardownReason reason = qtssCliSesTearDownServerInternalErr;
7 (void) QTSS_SetValue(inParams->inClientSession, qtssCliTeardownReason, 0, &reason, sizeof(reason));
8 (void)QTSS_Teardown(inParams->inClientSession);
9 return QTSS_RequestFailed;
10 }
//該節目檔案中音頻所能忍受的最大延遲
11 maxDelayToleranceForStream = (*theFile)->fMaxAudioDelayTolerance;
12 while (true)
13 {
//不存在待發送資料包,可能是檔案尚未打開
14 if ((*theFile)->fNextPacket == NULL)
15 {
16 void* theCookie = NULL;
//獲得第一個資料包,theTransmitTime為傳輸資料花費的時間
17 Float64 theTransmitTime = (*theFile)->fFile.GetNextPacket(&(*theFile)->fNextPacket, &(*theFile)->fNextPacketLen, &theCookie);
18 if ( QTRTPFile::errNoError != (*theFile)->fFile.Error() )
{//讀資料出錯,斷掉連接配接,傳回。此處略 }
…
19 (*theFile)->fStream = (QTSS_RTPStreamObject)theCookie; //得到RTPStream對象
20 (*theFile)->fPacketPlayTime = (*theFile)->fAdjustedPlayTime + ((SInt64)(theTransmitTime * 1000)); //推遲theTransmitTime長度的播放時間
21 (*theFile)->fPacketWasJustFetched = true;
22 if ((*theFile)->fNextPacket != NULL)
23 { // 判斷流格式
24 QTSS_RTPPayloadType* thePayloadType = NULL;
25 QTSS_Error theErr = QTSS_GetValuePtr( (*theFile)->fStream, qtssRTPStrPayloadType, 0, (void**)&thePayloadType, &theLen );
//設定視訊流可忍受的最大延遲時間
26 if (*thePayloadType == qtssVideoPayloadType)
27 maxDelayToleranceForStream = (*theFile)->fMaxVideoDelayTolerance;
28 }
29 }
//仍無資料,說明所有資料已經傳輸完成了
30 if ((*theFile)->fNextPacket = NULL)
31 { //向fStream中寫入長度為0的空資料,以便強制緩沖區重新整理
32 (void)QTSS_Write((*theFile)->fStream, NULL, 0, NULL, qtssWriteFlagsIsRTP);
33 inParams->outNextPacketTime = qtssDontCallSendPacketsAgain;
34 return QTSS_NoErr; //完成任務傳回
35 }
//提示:開始發送RTP資料包
//計算目前時間和該段資料應該發送的時間之間的相對間隔
36 SInt64 theRelativePacketTime = (*theFile)->fPacketPlayTime - inParams->inCurrentTime; // inCurrentTime = OS::Milliseconds();
37 SInt32 currentDelay = theRelativePacketTime * -1L; //計算傳輸延遲
38 theErr = QTSS_SetValue( (*theFile)->fStream, qtssRTPStrCurrentPacketDelay, 0, ¤tDelay, sizeof(currentDelay) ); //儲存該延遲
//如果延遲過大,就丢棄該包,等待發送下一個資料包
39 if (theRelativePacketTime > sMaxAdvSendTimeInMsec)
40 {
41 Assert( theRelativePacketTime > 0 );
42 inParams->outNextPacketTime = theRelativePacketTime;
43 return QTSS_NoErr;
44 }
//此處略去部分處理視訊品質的代碼…
// 發送目前資料包
45 QTSS_Error writeErr = QTSS_Write((*theFile)->fStream, (*theFile)->fNextPacket, (*theFile)->fNextPacketLen, NULL, qtssWriteFlagsIsRTP);
//其餘代碼略…
RTP子系統是DSS中最為複雜的部分之一,這是因為發送RTP資料包的過程不但涉及到網絡接口,而且和檔案系統有着密切的關系。DSS的一個重要特征就是能夠将線索化(Hinted)過的QuickTime檔案通過RTSP和RTP協定流化出去。所有分析這些檔案的代碼都被提取出來并且封裝在QTFile庫中。這種封裝方式使得系統的各個部分都變得簡單:QTFile負責處理檔案的分析;而DSS其他部分負責處理網絡和協定。伺服器中的RTPFileModule調用QTFile庫檢索索引過的QuickTime檔案的資料包和中繼資料。QTFile庫的講解超出了本文的範圍,但是希望讓DSS支援其他媒體格式的讀者能夠掌握它的實作機制。
5 DSS二次開發接口:Module開發流程
作為一個運作于多個作業系統平台的開發源代碼的伺服器,DSS提供了一種稱為Module的二次開發接口。使用這個開發接口,我們可以充分利用伺服器的可擴充性及其實作的多種協定,并且能夠保證和将來版本相容。DSS中的許多核心功能也是以Module的方式預先實作并且編譯的,是以可以說對Module的支援已經被設計到DSS的核心中去了。
下面我們将分析DSS的一個内嵌Module:QTSSFileModule的源代碼來說明Module的程式設計方式,QTSSFileModule的實作在QTSSFileModule.cpp檔案中。
每個QTSS Module必須實作兩個函數:
首先,每個QTSS Module必須實作一個主函數,伺服器調用該函數用于啟動和初始化子產品中的QTSS函數;QTSSFileModule主函數的實作如下:
QTSS_Error QTSSFileModule_Main(void* inPrivateArgs)
return _stublibrary_main(inPrivateArgs, QTSSFileModuleDispatch);
其中QTSSFileModuleDispatch是Module必須實作的分發函數名。
另一個需要實作的是分發函數,伺服器調用該函數實作某個特殊任務。此時,伺服器将向分發函數傳入任務的名字和一個任務相關的參數塊。QTSSFileModule分發函數的實作如下:
QTSS_Error QTSSFileModuleDispatch(QTSS_Role inRole, QTSS_RoleParamPtr inParamBlock)
{ //根據傳入的任務名稱和入參執行相應的處理函數
switch (inRole) //任務名稱
{
case QTSS_Register_Role:
return Register(&inParamBlock->regParams);
case QTSS_Initialize_Role:
return Initialize(&inParamBlock->initParams);
case QTSS_RereadPrefs_Role:
return RereadPrefs();
case QTSS_RTSPRequest_Role:
return ProcessRTSPRequest(&inParamBlock->rtspRequestParams);
case QTSS_RTPSendPackets_Role:
return SendPackets(&inParamBlock->rtpSendPacketsParams);
case QTSS_ClientSessionClosing_Role:
return DestroySession(&inParamBlock->clientSessionClosingParams);
return QTSS_NoErr;
其中,分發函數的入參是一個聯合,它根據任務名稱的不同,具體的資料結構也不同,下面是該資料結構的定義:
typedef union
QTSS_Register_Params regParams;
QTSS_Initialize_Params initParams;
QTSS_ErrorLog_Params errorParams;
//此處略去其他多個資料結構…
} QTSS_RoleParams, *QTSS_RoleParamPtr;
DSS提供了兩種方式把我們自己開發的Module添加到伺服器中:一種稱為靜态子產品(Static Module),該方式将我們開發的Module代碼直接編譯到核心中去;另一種稱為動态子產品(Dynamic Module),該方式将我們開發的Module單獨編譯稱為一個動态庫,然後修改配置,使伺服器在啟動時将其加載。圖9描述了DSS啟動和關閉時子產品調用流程。
當伺服器啟動時,它首先裝載沒有被編譯進核心的動态子產品,然後才裝載被編譯進核心的靜态子產品;由于現有的大部分系統功能都是以靜态子產品的方式存在的,如果你希望用自己的子產品替換某個系統功能,最好是編寫一個動态子產品,因為它們将早于靜态子產品被裝載。
無論是靜态子產品還是動态子產品,它們的代碼都是相同的,唯一的不同就是它們的編譯方式。首先為了将靜态子產品編譯到伺服器中,我們必須修改QTSServer.cpp檔案中的QTSServer::LoadCompiledInModules,并向其中加入以下代碼:
QTSSModule* myModule=new QTSSModule(*_XYZ_*);
(void)myModule->Initialize(&sCallbacks,&_XYZMAIN_);
(void)AddModule(MyModule);
其中,XYZ是靜态子產品的名字,而XYZMAIN則是其主函數入口。
動态子產品的編譯方法如下:首先單獨編譯動态子產品為一個動态共享庫;将該共享庫與QTSS API stub library連結到一起;最後将結果檔案放置到/usr/sbin/QTSSModules目錄中去。此後,伺服器在啟動時就将自動調用該動态子產品。