a) QTSS_NoErr,意味着已经收到该socket的完整数据,但是没有收集到本次请求的全部数据,这时需要接着请求监听读事件,获取更多数据组成完整的RTSP请求消息。
b) QTSS_RequestArrived,意味着此时已经收到完整请求的报文,可以正常进入下一状态机了。
c) E2BIG,表示缓存区已经溢出,默认缓存区的大小为( kRequestBufferSizeInBytes = 4096),进入kHaveNonTunnelMessage状态机,然后在改状态机下响应错误
case kReadingFirstRequest:
{
if ((err = fInputStream.ReadRequest()) == QTSS_NoErr)
{
// If the RequestStream returns QTSS_NoErr, it means
// that we've read all outstanding data off the socket,
// and still don't have a full request. Wait for more data.
//+rt use the socket that reads the data, may be different now.
fInputSocketP->RequestEvent(EV_RE);
return 0;
}
if ((err != QTSS_RequestArrived) && (err != E2BIG))
{
// Any other error implies that the client has gone away. At this point,
// we can't have 2 sockets, so we don't need to do the "half closed" check
// we do below
Assert(err > 0);
Assert(!this->IsLiveSession());
break;
}
if (err == QTSS_RequestArrived)
fState = kHTTPFilteringRequest;
// If we get an E2BIG, it means our buffer was overfilled.
// In that case, we can just jump into the following state, and
// the code their does a check for this error and returns an error.
if (err == E2BIG)
fState = kHaveNonTunnelMessage;
}
3. 状态机 kHaveNonTunnelMessage
进入此状态,说明请求报文格式是正确的,请求已进入受理状态,具体操作步骤如下:
a) 创建RTSPRequest对象,用于解析RTSP消息;
b) 此状态中对fReadMutex,fSessionMutex进行加锁,禁止在处理报文的过程中接收以RTP Interleaved接收RTP数据或者发出RTSP响应报文
c) 对错误码E2BIG、QTSS_BadArgument进行处理,响应qtssClientBadRequest;
d) 将状态机跳转到kFilteringRequest下;
case kHaveNonTunnelMessage:
{
// should only get here when fInputStream has a full message built.
Assert( fInputStream.GetRequestBuffer() );
Assert(fRequest == NULL);
fRequest = NEW RTSPRequest(this);
fRoleParams.rtspRequestParams.inRTSPRequest = fRequest;
fRoleParams.rtspRequestParams.inRTSPHeaders = fRequest->GetHeaderDictionary();
// We have an RTSP request and are about to begin processing. We need to
// make sure that anyone sending interleaved data on this session won't
// be allowed to do so until we are done sending our response
// We also make sure that a POST session can't snarf in while we're
// processing the request.
fReadMutex.Lock();
fSessionMutex.Lock();
// The fOutputStream's fBytesWritten counter is used to
// count the # of bytes for this RTSP response. So, at
// this point, reset it to 0 (we can then just let it increment
// until the next request comes in)
fOutputStream.ResetBytesWritten();
// Check for an overfilled buffer, and return an error.
if (err == E2BIG)
{
(void)QTSSModuleUtils::SendErrorResponse(fRequest, qtssClientBadRequest,
qtssMsgRequestTooLong);
fState = kPostProcessingRequest;
break;
}
// Check for a corrupt base64 error, return an error
if (err == QTSS_BadArgument)
{
(void)QTSSModuleUtils::SendErrorResponse(fRequest, qtssClientBadRequest,
qtssMsgBadBase64);
fState = kPostProcessingRequest;
break;
}
Assert(err == QTSS_RequestArrived);
fState = kFilteringRequest;
// Note that there is no break here. We'd like to continue onto the next
// state at this point. This goes for every case in this case statement
}
4. 状态机kFilteringRequest
a) 刷新超时任务fTimeoutTask,运转RTSP会话的超时机制。
b) 通过fIsDataPacket属性进行判断当前数据是否是一个数据包,而不是一个信令消息。该属性判断方法在RTSPRequestStream::ReadRequest()中。RTP包格式以$字符开头,后面紧跟着一个字节是信道标示符,后面两个字节是数字长度,Darwin就用这个字符区分是否为数据包。
c) 这时第一次开始调用module了,角色为kRTSPFilterRole。注册了该角色模块只有一个QTSSRefMovieModule,
case kFilteringRequest:
{
// We received something so auto refresh
// The need to auto refresh is because the api doesn't allow a module to refresh at this point
//
fTimeoutTask.RefreshTimeout();
//
// Before we even do this, check to see if this is a *data* packet,
// in which case this isn't an RTSP request, so we don't need to go
// through any of the remaining steps
if (fInputStream.IsDataPacket()) // can this interfere with MP3?
{
this->HandleIncomingDataPacket();
fState = kCleaningUp;
break;
}
//
// In case a module wants to replace the request
char* theReplacedRequest = NULL;
char* oldReplacedRequest = NULL;
// Setup the filter param block
QTSS_RoleParams theFilterParams;
theFilterParams.rtspFilterParams.inRTSPSession = this;
theFilterParams.rtspFilterParams.inRTSPRequest = fRequest;
theFilterParams.rtspFilterParams.outNewRequest = &theReplacedRequest;
// Invoke filter modules
numModules = QTSServerInterface::GetNumModulesInRole(QTSSModule::kRTSPFilterRole);
for (; (fCurrentModule < numModules) && ((!fRequest->HasResponseBeenSent()) || fModuleState.eventRequested); fCurrentModule++)
{
fModuleState.eventRequested = false;
fModuleState.idleTime = 0;
if (fModuleState.globalLockRequested )
{ fModuleState.globalLockRequested = false;
fModuleState.isGlobalLocked = true;
}
theModule = QTSServerInterface::GetModule(QTSSModule::kRTSPFilterRole, fCurrentModule);
(void)theModule->CallDispatch(QTSS_RTSPFilter_Role, &theFilterParams);
fModuleState.isGlobalLocked = false;
// If this module has requested an event, return and wait for the event to transpire
if (fModuleState.globalLockRequested) // call this request back locked
return this->CallLocked();
if (fModuleState.eventRequested)
{
this->ForceSameThread(); // We are holding mutexes, so we need to force
// the same thread to be used for next Run()
return fModuleState.idleTime; // If the module has requested idle time...
}
//
// Check to see if this module has replaced the request. If so, check
// to see if there is an old replacement that we should delete
if (theReplacedRequest != NULL)
{
if (oldReplacedRequest != NULL)
delete [] oldReplacedRequest;
fRequest->SetVal(qtssRTSPReqFullRequest, theReplacedRequest, ::strlen(theReplacedRequest));
oldReplacedRequest = theReplacedRequest;
theReplacedRequest = NULL;
}
}
fCurrentModule = 0;
if (fRequest->HasResponseBeenSent())
{
fState = kPostProcessingRequest;
break;
}
if (fSentOptionsRequest && this->ParseOptionsResponse())
{
fRoundTripTime = (SInt32) (OS::Milliseconds() - fOptionsRequestSendTime);
//qtss_printf("RTSPSession::Run RTT time = %"_S32BITARG_" msec\n", fRoundTripTime);
fState = kSendingResponse;
break;
}
else
// Otherwise, this is a normal request, so parse it and get the RTPSession.
this->SetupRequest();
// This might happen if there is some syntax or other error,
// or if it is an OPTIONS request
if (fRequest->HasResponseBeenSent())
{
fState = kPostProcessingRequest;
break;
}
fState = kRoutingRequest;
}
case kPreprocessingRequest:
{
// Invoke preprocessor modules
numModules = QTSServerInterface::GetNumModulesInRole(QTSSModule::kRTSPPreProcessorRole);
{
// Manipulation of the RTPSession from the point of view of
// a module is guarenteed to be atomic by the API.
Assert(fRTPSession != NULL);
OSMutexLocker locker(fRTPSession->GetSessionMutex());
for (; (fCurrentModule < numModules) && ((!fRequest->HasResponseBeenSent()) || fModuleState.eventRequested); fCurrentModule++)
{
fModuleState.eventRequested = false;
fModuleState.idleTime = 0;
if (fModuleState.globalLockRequested )
{ fModuleState.globalLockRequested = false;
fModuleState.isGlobalLocked = true;
}
theModule = QTSServerInterface::GetModule(QTSSModule::kRTSPPreProcessorRole, fCurrentModule);
(void)theModule->CallDispatch(QTSS_RTSPPreProcessor_Role, &fRoleParams);
fModuleState.isGlobalLocked = false;
// The way the API is set up currently, the first module that adds a stream
// to the session is responsible for sending RTP packets for the session.
if (fRTPSession->HasAnRTPStream() && (fRTPSession->GetPacketSendingModule() == NULL))
fRTPSession->SetPacketSendingModule(theModule);
if (fModuleState.globalLockRequested) // call this request back locked
return this->CallLocked();
// If this module has requested an event, return and wait for the event to transpire
if (fModuleState.eventRequested)
{
this->ForceSameThread(); // We are holding mutexes, so we need to force
// the same thread to be used for next Run()
return fModuleState.idleTime; // If the module has requested idle time...
}
}
}
fCurrentModule = 0;
if (fRequest->HasResponseBeenSent())
{
fState = kPostProcessingRequest;
break;
}
fState = kProcessingRequest;
}
一下是EasyDarwin的流程图