網際網路直播業務,除私有協定外,大多采用rtmp協定進行媒體釋出或推流,而很少選擇使用rtsp協定推流和釋出;而rtsp協定也是一個很好的協定,可以分别支援rtp over udp和 rtp over rtsp,媒體控制更加靈活。下面簡單介紹一下如何采用rtsp協定進行推流或媒體資源釋出。
1、rtsp支援标準的推流擴充,通過擴充announce消息後,可以使用rtspclient主動将媒體流釋出到伺服器,然後在伺服器提供直播或點播服務,不過live555等大多開源項目預設并不支援該擴充;早年間,跟華為、上海貝爾等廠商對接過流媒體系統,使用過該擴充,用戶端實作比較簡單,這裡就不多講了;
2、基于rtsp協定實作鍊路複用,擴充register消息,将rtsp協定擴充成雙向控制協定;
本文将會主要講述上述第二種方式,實作rtsp鍊路複用,擴充支援rtsp流媒體釋出的方法;基本步驟如下:
2.1、首先,用戶端啟動後,主動向rtsp伺服器進行register注冊,并攜帶需要釋出的(1-n個)媒體資源資訊;
2.2、其次,rtsp伺服器收到register注冊消息之後,将攜帶的媒體資源進行釋出,并将該rtsp的tcp鍊路複用,作為伺服器fd進行處理;
2.3、如果rtsp伺服器接收到外部媒體資源通路或媒體流點播或直播,則複用之前注冊過的rtsp鍊路,通路遠端的用戶端進行拉流;
總體來說,就是複用rtsp協定的tcp長連接配接,進行媒體資源釋出、解決拉流時的NAT穿越問題。通信流程如下:

實際上用戶端運作的是一個rtspServer,伺服器端運作的是一個rtspProxy;雙方可以始終保持這個rtsp的tcp長連接配接;根據需要随時點播或者直播用戶端的視訊流。而無需NAT穿越,rtp over udp傳輸也沒有NAT。
下面上代碼,live555中有一個register和unregitser流程,如果不能複用rtsp連結的話,這個流程用處不大,還不如私有協定更友善;是以下面的代碼就是将這個流程修改成可以滿足上述要求的rtsp鍊路複用的業務處理邏輯。
/RTSPRegisterSender.cpp
#include "RTSPRegisterSender.hh"
#include <GroupsockHelper.hh> // for MAKE_SOCKADDR_IN
// RTSPRegisterOrDeregisterSender implementation /
RTSPRegisterOrDeregisterSender::RTSPRegisterOrDeregisterSender(UsageEnvironment &env,
char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum,
Authenticator *authenticator,
int verbosityLevel, char const *applicationName)
: RTSPClient(env, NULL, verbosityLevel, applicationName, 0, -1),
fRemoteClientPortNum(remoteClientPortNum)
{
// Set up a connection to the remote client. To do this, we create a fake "rtsp://" URL for it:
char const *fakeRTSPURLFmt = "rtsp://%s:%u/";
unsigned fakeRTSPURLSize = strlen(fakeRTSPURLFmt) + strlen(remoteClientNameOrAddress) + 5;
char *fakeRTSPURL = new char[fakeRTSPURLSize];
sprintf(fakeRTSPURL, fakeRTSPURLFmt, remoteClientNameOrAddress, remoteClientPortNum);
setBaseURL(fakeRTSPURL);
delete[] fakeRTSPURL;
if(authenticator != NULL) { fCurrentAuthenticator = *authenticator; }
}
RTSPRegisterOrDeregisterSender::~RTSPRegisterOrDeregisterSender()
{
}
RTSPRegisterOrDeregisterSender::RequestRecord_REGISTER_or_DEREGISTER::RequestRecord_REGISTER_or_DEREGISTER(unsigned cseq, char const *cmdName,
RTSPClient::responseHandler *rtspResponseHandler,
char const *rtspURLToRegisterOrDeregister,
char const *proxyURLSuffix)
: RTSPClient::RequestRecord(cseq, cmdName, rtspResponseHandler),
fRTSPURLToRegisterOrDeregister(strDup(rtspURLToRegisterOrDeregister)),
fProxyURLSuffix(proxyURLSuffix ? strDup(proxyURLSuffix) : strDup(GetLocalMac()))
{
}
RTSPRegisterOrDeregisterSender::RequestRecord_REGISTER_or_DEREGISTER::~RequestRecord_REGISTER_or_DEREGISTER()
{
delete[] fRTSPURLToRegisterOrDeregister;
delete[] fProxyURLSuffix;
}
// RTSPRegisterSender implementation /
RTSPRegisterSender *RTSPRegisterSender::createNew(UsageEnvironment &env,
char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum,
char const *rtspURLToRegister, char const *proxyURLSuffix,
RTSPClient::responseHandler *rtspResponseHandler, Authenticator *authenticator,
Boolean requestStreamingViaTCP, Boolean reuseConnection,
int verbosityLevel, char const *applicationName)
{
return new RTSPRegisterSender(env, remoteClientNameOrAddress, remoteClientPortNum, rtspURLToRegister,
rtspResponseHandler, authenticator,
requestStreamingViaTCP, proxyURLSuffix, reuseConnection,
verbosityLevel, applicationName);
}
///将注冊的用戶端rtsp連結以及fd作為rtsp伺服器連結以及fd使用,複用tcp鍊路
void RTSPRegisterSender::defaultRegisterResponseHandler(RTSPClient *rtspClient, int resultCode, char *resultString)
{
RTSPRegisterSender *client = (RTSPRegisterSender *)rtspClient;
if(!client || resultCode) {
fprintf(stderr, "response error: \n%s\n", resultString);
return;
}
if(!client->fOurServer) {
OutPacketBuffer::maxSize = THE_MAX_OUT_PACKET_LEN;
portNumBits listenPort = client->getLocalListenPort() ? client->getLocalListenPort() : GetRandomRtpPorts();
client->fOurServer = RTSPServer::createNew(client->envir(), listenPort);
}
client->fOurServer->setReuseConnectionRequestHandler(client);
}
void RTSPRegisterSender::sendRegisterCommand()
{
// Send the "REGISTER" request:
sendRequest(new RequestRecord_REGISTER(++fCSeq, defaultRegisterResponseHandler, NULL, True, False, NULL));
}
void RTSPRegisterSender::grabConnection(int &sock, struct sockaddr_in &remoteAddress)
{
//sock = grabSocket();
sock = socketNum();
MAKE_SOCKADDR_IN(remoteAddr, fServerAddress, htons(fRemoteClientPortNum));
remoteAddress = remoteAddr;
}
RTSPRegisterSender::RTSPRegisterSender(UsageEnvironment &env,
char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum, char const *rtspURLToRegister,
RTSPClient::responseHandler *rtspResponseHandler, Authenticator *authenticator,
Boolean requestStreamingViaTCP, char const *proxyURLSuffix, Boolean reuseConnection,
int verbosityLevel, char const *applicationName)
: fOurServer(NULL), fLocalListenPortNum(0), RTSPRegisterOrDeregisterSender(env, remoteClientNameOrAddress, remoteClientPortNum,
authenticator, verbosityLevel, applicationName)
{
// Send the "REGISTER" request:
sendRequest(new RequestRecord_REGISTER(++fCSeq, rtspResponseHandler,
rtspURLToRegister, reuseConnection, requestStreamingViaTCP, proxyURLSuffix));
}
RTSPRegisterSender::~RTSPRegisterSender()
{
Medium::close(fOurServer);
}
Boolean RTSPRegisterSender::setRequestFields(RequestRecord *request,
char *&cmdURL, Boolean &cmdURLWasAllocated,
char const *&protocolStr,
char *&extraHeaders, Boolean &extraHeadersWereAllocated)
{
if(strcmp(request->commandName(), "REGISTER") == 0) {
RequestRecord_REGISTER *request_REGISTER = (RequestRecord_REGISTER *)request;
if(request_REGISTER->rtspURLToRegister()) {
setBaseURL(request_REGISTER->rtspURLToRegister());
}
cmdURL = (char *)url();
cmdURLWasAllocated = False;
// Generate the "Transport:" header that will contain our REGISTER-specific parameters. This will be "extraHeaders".
// First, generate the "proxy_url_suffix" parameter string, if any:
char *proxyURLSuffixParameterStr;
if(request_REGISTER->proxyURLSuffix() == NULL) {
proxyURLSuffixParameterStr = strDup("");
} else {
char const *proxyURLSuffixParameterFmt = "; proxy_url_suffix=%s";
unsigned proxyURLSuffixParameterSize = strlen(proxyURLSuffixParameterFmt)
+ strlen(request_REGISTER->proxyURLSuffix());
proxyURLSuffixParameterStr = new char[proxyURLSuffixParameterSize];
sprintf(proxyURLSuffixParameterStr, proxyURLSuffixParameterFmt, request_REGISTER->proxyURLSuffix());
}
char const *transportHeaderFmt = "Transport: %spreferred_delivery_protocol=%s%s\r\n";
unsigned transportHeaderSize = strlen(transportHeaderFmt) + 100 + strlen(proxyURLSuffixParameterStr);
char *transportHeaderStr = new char[transportHeaderSize];
sprintf(transportHeaderStr, transportHeaderFmt,
request_REGISTER->reuseConnection() ? "reuse_connection; " : "",
request_REGISTER->requestStreamingViaTCP() ? "interleaved" : "udp",
proxyURLSuffixParameterStr);
delete[] proxyURLSuffixParameterStr;
extraHeaders = transportHeaderStr;
extraHeadersWereAllocated = True;
return True;
} else {
return RTSPClient::setRequestFields(request, cmdURL, cmdURLWasAllocated, protocolStr, extraHeaders, extraHeadersWereAllocated);
}
}
RTSPRegisterSender::RequestRecord_REGISTER::RequestRecord_REGISTER(unsigned cseq, RTSPClient::responseHandler *rtspResponseHandler,
char const *rtspURLToRegister,
Boolean reuseConnection, Boolean requestStreamingViaTCP, char const *proxyURLSuffix)
: RTSPRegisterOrDeregisterSender::RequestRecord_REGISTER_or_DEREGISTER(cseq, "REGISTER", rtspResponseHandler, rtspURLToRegister, proxyURLSuffix),
fReuseConnection(reuseConnection), fRequestStreamingViaTCP(requestStreamingViaTCP)
{
}
RTSPRegisterSender::RequestRecord_REGISTER::~RequestRecord_REGISTER()
{
}
// RTSPDeregisterSender implementation /
RTSPDeregisterSender *RTSPDeregisterSender::createNew(UsageEnvironment &env,
char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum, char const *rtspURLToDeregister,
RTSPClient::responseHandler *rtspResponseHandler, Authenticator *authenticator,
char const *proxyURLSuffix, int verbosityLevel, char const *applicationName)
{
return new RTSPDeregisterSender(env, remoteClientNameOrAddress, remoteClientPortNum, rtspURLToDeregister,
rtspResponseHandler, authenticator,
proxyURLSuffix, verbosityLevel, applicationName);
}
void RTSPDeregisterSender::sendDeregisterCommand()
{
// Send the "REGISTER" request:
sendRequest(new RequestRecord_DEREGISTER(++fCSeq, RTSPRegisterSender::defaultRegisterResponseHandler, NULL, NULL));
}
RTSPDeregisterSender::RTSPDeregisterSender(UsageEnvironment &env,
char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum, char const *rtspURLToDeregister,
RTSPClient::responseHandler *rtspResponseHandler, Authenticator *authenticator,
char const *proxyURLSuffix,
int verbosityLevel, char const *applicationName)
: RTSPRegisterOrDeregisterSender(env, remoteClientNameOrAddress, remoteClientPortNum, authenticator, verbosityLevel, applicationName)
{
// Send the "DEREGISTER" request:
sendRequest(new RequestRecord_DEREGISTER(++fCSeq, rtspResponseHandler, rtspURLToDeregister, proxyURLSuffix));
}
RTSPDeregisterSender::~RTSPDeregisterSender()
{
}
Boolean RTSPDeregisterSender::setRequestFields(RequestRecord *request,
char *&cmdURL, Boolean &cmdURLWasAllocated,
char const *&protocolStr,
char *&extraHeaders, Boolean &extraHeadersWereAllocated)
{
if(strcmp(request->commandName(), "DEREGISTER") == 0) {
RequestRecord_DEREGISTER *request_DEREGISTER = (RequestRecord_DEREGISTER *)request;
if(request_DEREGISTER->rtspURLToDeregister()) {
setBaseURL(request_DEREGISTER->rtspURLToDeregister());
}
cmdURL = (char *)url();
cmdURLWasAllocated = False;
// Generate the "Transport:" header that will contain our DEREGISTER-specific parameters. This will be "extraHeaders".
// First, generate the "proxy_url_suffix" parameter string, if any:
char *proxyURLSuffixParameterStr;
if(request_DEREGISTER->proxyURLSuffix() == NULL) {
proxyURLSuffixParameterStr = strDup("");
} else {
char const *proxyURLSuffixParameterFmt = "proxy_url_suffix=%s";
unsigned proxyURLSuffixParameterSize = strlen(proxyURLSuffixParameterFmt)
+ strlen(request_DEREGISTER->proxyURLSuffix());
proxyURLSuffixParameterStr = new char[proxyURLSuffixParameterSize];
sprintf(proxyURLSuffixParameterStr, proxyURLSuffixParameterFmt, request_DEREGISTER->proxyURLSuffix());
}
char const *transportHeaderFmt = "Transport: %s\r\n";
unsigned transportHeaderSize = strlen(transportHeaderFmt) + strlen(proxyURLSuffixParameterStr);
char *transportHeaderStr = new char[transportHeaderSize];
sprintf(transportHeaderStr, transportHeaderFmt,
proxyURLSuffixParameterStr);
delete[] proxyURLSuffixParameterStr;
extraHeaders = transportHeaderStr;
extraHeadersWereAllocated = True;
return True;
} else {
return RTSPClient::setRequestFields(request, cmdURL, cmdURLWasAllocated, protocolStr, extraHeaders, extraHeadersWereAllocated);
}
}
RTSPDeregisterSender::RequestRecord_DEREGISTER::RequestRecord_DEREGISTER(unsigned cseq, RTSPClient::responseHandler *rtspResponseHandler,
char const *rtspURLToDeregister,
char const *proxyURLSuffix)
: RTSPRegisterOrDeregisterSender::RequestRecord_REGISTER_or_DEREGISTER(cseq, "DEREGISTER", rtspResponseHandler, rtspURLToDeregister, proxyURLSuffix)
{
}
RTSPDeregisterSender::RequestRecord_DEREGISTER::~RequestRecord_DEREGISTER()
{
}
//獨立的rtsp注冊線程,或rtsp複用後的伺服器線程。
RTSPRegister *RTSPRegister::gRtspRegister = NULL;
RTSPRegister *RTSPRegister::createNew(char const *ip, portNumBits port, portNumBits extPort, char const *username,
char const *password, char const *streamId)
{
RTSPRegister *pThis = gRtspRegister;
Authenticator *authenticator = NULL;
if(!pThis && ip && port) {
if(username && password) {
authenticator = new Authenticator(username, password);
}
pThis = new RTSPRegister(ip, port, extPort, authenticator, streamId);
pThis->fScheduler = BasicTaskScheduler::createNew();
pThis->fEnv = BasicUsageEnvironment::createNew(*pThis->fScheduler);
pThis->fRegisterClient = RTSPRegisterSender::createNew(*pThis->fEnv, ip, port, NULL, streamId,
RTSPRegisterSender::defaultRegisterResponseHandler,
authenticator);
pThis->fRegisterClient->setLocalListenPort(extPort);
pThis->fScheduler->rescheduleDelayedTask(pThis->fHeartbeatTask, THE_DEFAULT_HEARTBEAT_INTERVAL * 1000, registerStream, pThis);
IFLY_CreateThread(runTaskScheduler, pThis);
gRtspRegister = pThis;
}
return pThis;
}
void RTSPRegister::registerStream(void *clientdata)
{
static int errSendCount = 0;
RTSPRegister *pThis = (RTSPRegister *)clientdata;
if(pThis && pThis->fRegisterClient) {
pThis->fRegisterClient->sendRegisterCommand();
if(pThis->fEnv && strstr(pThis->fEnv->getResultMsg(), "REGISTER")) {
++errSendCount;
if(errSendCount >= THE_DEFAULT_HEARTBEAT_TIMES) {
Medium::close(pThis->fRegisterClient);
pThis->fRegisterClient = RTSPRegisterSender::createNew(*pThis->fEnv, pThis->fIp, pThis->fPort, NULL, pThis->fStreamId,
RTSPRegisterSender::defaultRegisterResponseHandler,
pThis->fAuthenticator);
pThis->fRegisterClient->setLocalListenPort(pThis->fExtPort);
pThis->fScheduler->rescheduleDelayedTask(pThis->fHeartbeatTask, THE_DEFAULT_HEARTBEAT_INTERVAL * 1000, registerStream, pThis);
} else {
pThis->fScheduler->rescheduleDelayedTask(pThis->fHeartbeatTask, 3 * MILLION, registerStream, pThis);
}
} else {
pThis->fScheduler->rescheduleDelayedTask(pThis->fHeartbeatTask, THE_DEFAULT_HEARTBEAT_INTERVAL * 1000, registerStream, pThis);
errSendCount = 0;
}
}
}
RTSPRegister::RTSPRegister(char const *ip, portNumBits port, portNumBits extPort, Authenticator *authenticator, char const *streamId)
: fIp(strDup(ip)), fPort(port), fExtPort(extPort), fAuthenticator(authenticator), fRun(0), fHeartbeatTask(NULL), fStreamId(strDup(streamId))
{
}
RTSPRegister::~RTSPRegister()
{
delete [] fIp;
delete [] fStreamId;
delete fAuthenticator;
}
THREAD_RTN RTSPRegister::runTaskScheduler(void *lpParamt)
{
RTSPRegister *pThis = (RTSPRegister *)lpParamt;
RTSP_INFO("*************create register task scheduler thread: %ld\n", IFLY_GetThreadId());
pThis->fEnv->taskScheduler().setId(IFLY_GetThreadId());
pThis->fEnv->taskScheduler().doEventLoop(&pThis->fRun);
Medium::close(pThis->fRegisterClient);
pThis->fRegisterClient = NULL;
pThis->fEnv->reclaim();
pThis->fEnv = NULL;
delete pThis->fScheduler;
pThis->fScheduler = NULL;
delete pThis;
gRtspRegister = NULL;
RTSP_INFO("*************delete register task scheduler thread: %ld\n", IFLY_GetThreadId());
return 0;
}
void RTSPRegister::shutdown()
{
RTSPRegister *pThis = createNew();
if(pThis) {
pThis->fScheduler->unscheduleDelayedTask(pThis->fHeartbeatTask);
pThis->fRun = 1;
}
}
//RTSPServerRegister.cpp
#include "RTSPServer.hh"
#include "RTSPCommon.hh"
#include "RTSPRegisterSender.hh"
#include "ProxyServerMediaSession.hh"
#include "GroupsockHelper.hh"
// Implementation of "RTSPServer::registerStream()": //
static void rtspRegisterResponseHandler(RTSPClient *rtspClient, int resultCode, char *resultString); // forward
// A class that represents the state of a "REGISTER" request in progress:
class RegisterRequestRecord: public RTSPRegisterSender
{
public:
RegisterRequestRecord(RTSPServer &ourServer, unsigned requestId,
char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum, char const *rtspURLToRegister,
RTSPServer::responseHandlerForREGISTER *responseHandler, Authenticator *authenticator,
Boolean requestStreamingViaTCP, char const *proxyURLSuffix)
: RTSPRegisterSender(ourServer.envir(), remoteClientNameOrAddress, remoteClientPortNum, rtspURLToRegister,
rtspRegisterResponseHandler, authenticator,
requestStreamingViaTCP, proxyURLSuffix, True,
#ifdef DEBUG
1,
#else
0,
#endif
NULL),
fOurServer(ourServer), fRequestId(requestId), fResponseHandler(responseHandler)
{
// Add ourself to our server's 'pending REGISTER or DEREGISTER requests' table:
ourServer.fPendingRegisterOrDeregisterRequests->Add((char const *)this, this);
}
virtual ~RegisterRequestRecord()
{
// Remove ourself from the server's 'pending REGISTER or DEREGISTER requests' hash table before we go:
fOurServer.fPendingRegisterOrDeregisterRequests->Remove((char const *)this);
}
void handleResponse(int resultCode, char *resultString)
{
if(resultCode == 0) {
// The "REGISTER" request succeeded, so use the still-open RTSP socket to await incoming commands from the remote endpoint:
int sock;
struct sockaddr_in remoteAddress;
grabConnection(sock, remoteAddress);
if(sock >= 0) {
increaseSendBufferTo(envir(), sock, 50 * 1024); // in anticipation of streaming over it
(void)fOurServer.createNewClientConnection(sock, remoteAddress);
}
}
if(fResponseHandler != NULL) {
// Call our (REGISTER-specific) response handler now:
(*fResponseHandler)(&fOurServer, fRequestId, resultCode, resultString);
} else {
// We need to delete[] "resultString" before we leave:
delete[] resultString;
}
// We're completely done with the REGISTER command now, so delete ourself now:
delete this;
}
private:
RTSPServer &fOurServer;
unsigned fRequestId;
RTSPServer::responseHandlerForREGISTER *fResponseHandler;
};
static void rtspRegisterResponseHandler(RTSPClient *rtspClient, int resultCode, char *resultString)
{
RegisterRequestRecord *registerRequestRecord = (RegisterRequestRecord *)rtspClient;
registerRequestRecord->handleResponse(resultCode, resultString);
}
unsigned RTSPServer::registerStream(ServerMediaSession *serverMediaSession,
char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum,
responseHandlerForREGISTER *responseHandler,
char const *username, char const *password,
Boolean receiveOurStreamViaTCP, char const *proxyURLSuffix)
{
// Create a new "RegisterRequestRecord" that will send the "REGISTER" command.
// (This object will automatically get deleted after we get a response to the "REGISTER" command, or if we're deleted.)
Authenticator *authenticator = NULL;
if(username != NULL) {
if(password == NULL) { password = ""; }
authenticator = new Authenticator(username, password);
}
unsigned requestId = ++fRegisterOrDeregisterRequestCounter;
new RegisterRequestRecord(*this, requestId,
remoteClientNameOrAddress, remoteClientPortNum, rtspURL(serverMediaSession),
responseHandler, authenticator,
receiveOurStreamViaTCP, proxyURLSuffix);
delete authenticator; // we can do this here because it was copied to the "RegisterRequestRecord"
return requestId;
}
// Implementation of "RTSPServer::deregisterStream()": //
static void rtspDeregisterResponseHandler(RTSPClient *rtspClient, int resultCode, char *resultString); // forward
// A class that represents the state of a "DEREGISTER" request in progress:
class DeregisterRequestRecord: public RTSPDeregisterSender
{
public:
DeregisterRequestRecord(RTSPServer &ourServer, unsigned requestId,
char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum, char const *rtspURLToDeregister,
RTSPServer::responseHandlerForDEREGISTER *responseHandler, Authenticator *authenticator,
char const *proxyURLSuffix)
: RTSPDeregisterSender(ourServer.envir(), remoteClientNameOrAddress, remoteClientPortNum, rtspURLToDeregister,
rtspDeregisterResponseHandler, authenticator, proxyURLSuffix,
#ifdef DEBUG
1,
#else
0,
#endif
NULL),
fOurServer(ourServer), fRequestId(requestId), fResponseHandler(responseHandler)
{
// Add ourself to our server's 'pending REGISTER or DEREGISTER requests' table:
ourServer.fPendingRegisterOrDeregisterRequests->Add((char const *)this, this);
}
virtual ~DeregisterRequestRecord()
{
// Remove ourself from the server's 'pending REGISTER or DEREGISTER requests' hash table before we go:
fOurServer.fPendingRegisterOrDeregisterRequests->Remove((char const *)this);
}
void handleResponse(int resultCode, char *resultString)
{
if(fResponseHandler != NULL) {
// Call our (DEREGISTER-specific) response handler now:
(*fResponseHandler)(&fOurServer, fRequestId, resultCode, resultString);
} else {
// We need to delete[] "resultString" before we leave:
delete[] resultString;
}
// We're completely done with the DEREGISTER command now, so delete ourself now:
delete this;
}
private:
RTSPServer &fOurServer;
unsigned fRequestId;
RTSPServer::responseHandlerForDEREGISTER *fResponseHandler;
};
static void rtspDeregisterResponseHandler(RTSPClient *rtspClient, int resultCode, char *resultString)
{
DeregisterRequestRecord *deregisterRequestRecord = (DeregisterRequestRecord *)rtspClient;
deregisterRequestRecord->handleResponse(resultCode, resultString);
}
unsigned RTSPServer::deregisterStream(ServerMediaSession *serverMediaSession,
char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum,
responseHandlerForDEREGISTER *responseHandler,
char const *username, char const *password,
char const *proxyURLSuffix)
{
// Create a new "DeregisterRequestRecord" that will send the "DEREGISTER" command.
// (This object will automatically get deleted after we get a response to the "DEREGISTER" command, or if we're deleted.)
Authenticator *authenticator = NULL;
if(username != NULL) {
if(password == NULL) { password = ""; }
authenticator = new Authenticator(username, password);
}
unsigned requestId = ++fRegisterOrDeregisterRequestCounter;
new DeregisterRequestRecord(*this, requestId,
remoteClientNameOrAddress, remoteClientPortNum, rtspURL(serverMediaSession),
responseHandler, authenticator,
proxyURLSuffix);
delete authenticator; // we can do this here because it was copied to the "DeregisterRequestRecord"
return requestId;
}
Boolean RTSPServer::weImplementREGISTER(char const * ,
char const * , char *&responseStr)
{
// By default, servers do not implement our custom "REGISTER"/"DEREGISTER" commands:
responseStr = NULL;
return False;
}
void RTSPServer::implementCmd_REGISTER(char const * ,
char const * , char const * , int ,
Boolean , char const * )
{
// By default, this function is a 'noop'
}
// Special mechanism for handling our custom "REGISTER" command:
RTSPServer::RTSPClientConnection::ParamsForREGISTER::ParamsForREGISTER(char const *cmd,
RTSPServer::RTSPClientConnection *ourConnection, char const *url, char const *urlSuffix,
Boolean reuseConnection, Boolean deliverViaTCP, char const *proxyURLSuffix)
: fCmd(strDup(cmd)), fOurConnection(ourConnection), fURL(strDup(url)), fURLSuffix(strDup(urlSuffix)),
fReuseConnection(reuseConnection), fDeliverViaTCP(deliverViaTCP), fProxyURLSuffix(strDup(proxyURLSuffix))
{
}
RTSPServer::RTSPClientConnection::ParamsForREGISTER::~ParamsForREGISTER()
{
delete[](char *)fCmd;
delete[] fURL;
delete[] fURLSuffix;
delete[] fProxyURLSuffix;
}
#define DELAY_USECS_AFTER_REGISTER_RESPONSE 100000
void RTSPServer::RTSPClientConnection::handleCmd_REGISTER(char const *cmd,
char const *url, char const *urlSuffix, char const *fullRequestStr,
Boolean reuseConnection, Boolean deliverViaTCP, char const *proxyURLSuffix)
{
char *responseStr;
if(fOurRTSPServer.weImplementREGISTER(cmd, proxyURLSuffix, responseStr)) {
// The "REGISTER"/"DEREGISTER" command - if we implement it - may require access control:
if(!authenticationOK(cmd, urlSuffix, fullRequestStr)) { return; }
// We implement the "REGISTER"/"DEREGISTER" command by first replying to it, then actually
// handling it (in a separate event-loop task, that will get called after the reply has
// been done).
// Hack: If we're going to reuse the command's connection for subsequent RTSP commands, then we
// delay the actual handling of the command slightly, to make it less likely that the first
// subsequent RTSP command (e.g., "DESCRIBE") will end up in the client's reponse buffer before
// the socket (at the far end) gets reused for RTSP command handling.
setRTSPResponse(responseStr == NULL ? "200 OK" : responseStr);
delete[] responseStr;
++fRecursionCount;
ParamsForREGISTER *registerParams = new ParamsForREGISTER(cmd, this, url, urlSuffix, reuseConnection, deliverViaTCP, proxyURLSuffix);
envir().taskScheduler().scheduleDelayedTask(DELAY_USECS_AFTER_REGISTER_RESPONSE, (TaskFunc *)continueHandlingREGISTER, registerParams);
} else if(responseStr != NULL) {
setRTSPResponse(responseStr);
delete[] responseStr;
++fRecursionCount;
ParamsForREGISTER *registerParams = new ParamsForREGISTER("DEREGISTER", this, url, urlSuffix, reuseConnection, deliverViaTCP, proxyURLSuffix);
envir().taskScheduler().scheduleDelayedTask(DELAY_USECS_AFTER_REGISTER_RESPONSE, (TaskFunc *)continueHandlingREGISTER, registerParams);
} else {
handleCmd_notSupported();
}
}
void RTSPServer::RTSPClientConnection::continueHandlingREGISTER(ParamsForREGISTER *params)
{
params->fOurConnection->continueHandlingREGISTER1(params);
}
void RTSPServer::RTSPClientConnection::continueHandlingREGISTER1(ParamsForREGISTER *params)
{
// Reuse our socket if requested:
int socketNumToBackEndServer = params->fReuseConnection ? fClientOutputSocket : -1;
RTSPServer *ourServer = &fOurRTSPServer; // copy the pointer now, in case we "delete this" below
ourServer->implementCmd_REGISTER(params->fCmd,
params->fURL, params->fURLSuffix, socketNumToBackEndServer,
params->fDeliverViaTCP, params->fProxyURLSuffix);
delete params;
if(socketNumToBackEndServer >= 0) {
// Because our socket will no longer be used by the server to handle incoming requests, we can now delete this
// "RTSPClientConnection" object. We do this now, in case the "implementCmd_REGISTER()" call below would also end up
// deleting this.
fClientInputSocket = fClientOutputSocket = -1; // so the socket doesn't get closed when we get deleted
RTSP_INFO("Switch server connection into client connection, not real release fd[%d].\n", socketNumToBackEndServer);
delete this;
} else {
--fRecursionCount;
}
}
/ RTSPServerWithREGISTERProxying implementation /
RTSPServerWithREGISTERProxying *RTSPServerWithREGISTERProxying::createNew(UsageEnvironment &env, Port ourPort,
UserAuthenticationDatabase *authDatabase, UserAuthenticationDatabase *authDatabaseForREGISTER,
unsigned reclamationSeconds, unsigned subThreadNum,
Boolean streamRTPOverTCP, int verbosityLevelForProxying, int streamOutType,
char const *backEndUsername, char const *backEndPassword)
{
if(fSelf) {
RTSP_WARN("RTSPServer already created in thread: %d\n", fSelf->envir().taskScheduler().getId());
}
int ourSocket = setUpOurSocket(env, ourPort);
if(ourSocket == -1) {
return NULL;
}
fSelf = new RTSPServerWithREGISTERProxying(env, ourSocket, ourPort,
authDatabase, authDatabaseForREGISTER,
reclamationSeconds, subThreadNum,
streamRTPOverTCP, verbosityLevelForProxying,
backEndUsername, backEndPassword);
fSelf->streamOutType = streamOutType;
return (RTSPServerWithREGISTERProxying *)fSelf;
}
RTSPServerWithREGISTERProxying::RTSPServerWithREGISTERProxying(UsageEnvironment &env, int ourSocket, Port ourPort,
UserAuthenticationDatabase *authDatabase, UserAuthenticationDatabase *authDatabaseForREGISTER,
unsigned reclamationSeconds, unsigned subThreadNum,
Boolean streamRTPOverTCP, int verbosityLevelForProxying,
char const *backEndUsername, char const *backEndPassword)
: RTSPServer(env, ourSocket, ourPort, authDatabase, reclamationSeconds, subThreadNum, streamRTPOverTCP, verbosityLevelForProxying),
fRegisteredProxyCounter(0), fAllowedCommandNames(NULL), fAuthDBForREGISTER(authDatabaseForREGISTER),
fBackEndUsername(strDup(backEndUsername)), fBackEndPassword(strDup(backEndPassword))
{
}
RTSPServerWithREGISTERProxying::~RTSPServerWithREGISTERProxying()
{
delete[] fAllowedCommandNames;
delete[] fBackEndUsername;
delete[] fBackEndPassword;
}
char const *RTSPServerWithREGISTERProxying::allowedCommandNames()
{
if(fAllowedCommandNames == NULL) {
char const *baseAllowedCommandNames = RTSPServer::allowedCommandNames();
char const *newAllowedCommandName = ", REGISTER, DEREGISTER";
fAllowedCommandNames = new char[strlen(baseAllowedCommandNames) + strlen(newAllowedCommandName) + 1];
sprintf(fAllowedCommandNames, "%s%s", baseAllowedCommandNames, newAllowedCommandName);
}
return fAllowedCommandNames;
}
Boolean RTSPServerWithREGISTERProxying::weImplementREGISTER(char const *cmd,
char const *proxyURLSuffix, char *&responseStr)
{
// First, check whether we have already proxied a stream as "proxyURLSuffix":
if(proxyURLSuffix != NULL) {
ServerMediaSession *sms = lookupServerMediaSession(proxyURLSuffix);
if((strcmp(cmd, "REGISTER") == 0 && sms != NULL) ||
(strcmp(cmd, "DEREGISTER") == 0 && sms == NULL)) {
responseStr = strDup("451 Invalid parameter");
return False;
}
}
// Otherwise, we will implement it:
responseStr = NULL;
return True;
}
void RTSPServerWithREGISTERProxying::implementCmd_REGISTER(char const *cmd,
char const *url, char const * , int socketToRemoteServer,
Boolean deliverViaTCP, char const *proxyURLSuffix)
{
// Continue setting up proxying for the specified URL.
// By default:
// - We use "registeredProxyStream-N" as the (front-end) stream name (ignoring the back-end stream's 'urlSuffix'),
// unless "proxyURLSuffix" is non-NULL (in which case we use that)
// - There is no 'username' and 'password' for the back-end stream. (Thus, access-controlled back-end streams will fail.)
// - If "fStreamRTPOverTCP" is True, then we request delivery over TCP, regardless of the value of "deliverViaTCP".
// (Otherwise, if "fStreamRTPOverTCP" is False, we use the value of "deliverViaTCP" to decide this.)
// To change this default behavior, you will need to subclass "RTSPServerWithREGISTERProxying", and reimplement this function.
ProxyServerMediaSession *sms = NULL;
char const *proxyStreamName = NULL;
char proxyStreamNameBuf[100] = {0};
struct sockaddr_in name = {0};
int len = sizeof(struct sockaddr);
if(proxyURLSuffix == NULL) {
sprintf(proxyStreamNameBuf, "registeredProxyStream-%u", ++fRegisteredProxyCounter);
proxyStreamName = proxyStreamNameBuf;
} else {
proxyStreamName = proxyURLSuffix;
}
sms = (ProxyServerMediaSession *)lookupServerMediaSession(proxyStreamName);
if(strcmp(cmd, "REGISTER") == 0 && !sms) {
portNumBits tunnelOverHTTPPortNum = getStreamRTPOverTCP() ? (portNumBits)(~0) : 0;
// We don't support streaming from the back-end via RTSP/RTP/RTCP-over-HTTP; only via RTP/RTCP-over-TCP or RTP/RTCP-over-UDP
sms = ProxyServerMediaSession::createNew(envir(), this, NULL, proxyStreamName, fBackEndUsername, fBackEndPassword,
tunnelOverHTTPPortNum, getVerbosityLevel(), socketToRemoteServer);
sms->incrementReferenceCount();
addServerMediaSession(sms);
// (Regardless of the verbosity level) announce the fact that we're proxying this new stream, and the URL to use to access it:
if(sms->fProxyRTSPClient && !getpeername(sms->fProxyRTSPClient->socketNum(), (struct sockaddr *)&name, (socklen_t *)&len)) {
char *urlPrefix = rtspURLPrefix(sms->fProxyRTSPClient->socketNum());
RTSP_INFO("Proxying the registered rtspserver: rtsp://%s/streamId/real, by the url: %sstreamId/real?proxyid=%s\n",
inet_ntoa(name.sin_addr), urlPrefix, sms->streamName());
delete[] urlPrefix;
}
} else if(strcmp(cmd, "DEREGISTER") == 0 && sms) { // "DEREGISTER"
sms->decrementReferenceCount();
deleteServerMediaSession(sms);
envir().taskScheduler().disableBackgroundHandling(socketToRemoteServer);
::closeSocket(socketToRemoteServer);
RTSP_INFO("closed socket[%d]\n", socketToRemoteServer);
GenericMediaServer::ServerMediaSessionIterator *iter = new GenericMediaServer::ServerMediaSessionIterator(*this);
while((sms = (ProxyServerMediaSession *)iter->next()) != NULL) {
if(sms->fProxyRTSPClient && !sms->fProxyRTSPClient->url()
&& !getpeername(sms->fProxyRTSPClient->socketNum(), (struct sockaddr *)&name, (socklen_t *)&len)) {
char *urlPrefix = rtspURLPrefix(sms->fProxyRTSPClient->socketNum());
RTSP_INFO("Proxying the registered rtspserver: rtsp://%s/streamId/real, by the url: %sstreamId/real?proxyid=%s\n",
inet_ntoa(name.sin_addr), urlPrefix, sms->streamName());
delete[] urlPrefix;
}
}
delete iter;
}
}
UserAuthenticationDatabase *RTSPServerWithREGISTERProxying::getAuthenticationDatabaseForCommand(char const *cmdName)
{
if(strcmp(cmdName, "REGISTER") == 0) { return fAuthDBForREGISTER; }
return RTSPServer::getAuthenticationDatabaseForCommand(cmdName);
}
/main函數中調用:
if(needAuth) {
ourAuthenticator = new Authenticator(username, password);
}
RTSPDeregisterSender::createNew(*env, rtspIp, rtspPort, url,
RTSPRegisterSender::defaultRegisterResponseHandler,
ourAuthenticator, streamId, verbosityLevel, name);
// We have the command-line arguments. Send the command:
RTSPRegisterSender::createNew(*env, rtspIp, rtspPort, url, streamId,
RTSPRegisterSender::defaultRegisterResponseHandler,
ourAuthenticator, streamRTPOverTCP, True,
verbosityLevel, name);
// Note: This object will be deleted later, by the response handler
env->taskScheduler().doEventLoop(); // does not return
以上是用戶端代碼,下面是幾個伺服器需要修改的地方:
在int RTSPServer::RTSPClientConnection::handleRequestBytes(int newBytesRead)中補充以下代碼:
………………
} else if(strcmp(cmdName, "TEARDOWN") == 0
|| strcmp(cmdName, "PLAY") == 0
|| strcmp(cmdName, "PAUSE") == 0
|| strcmp(cmdName, "GET_PARAMETER") == 0
|| strcmp(cmdName, "SET_PARAMETER") == 0) {
if(clientSession != NULL) {
clientSession->handleCmd_withinSession(this, cmdName, urlPreSuffix, urlSuffix, (char const *)fRequestBuffer);
} else {
handleCmd_sessionNotFound();
}
} else if(strcmp(cmdName, "REGISTER") == 0 || strcmp(cmdName, "DEREGISTER") == 0) {
// Because - unlike other commands - an implementation of this command needs
// the entire URL, we re-parse the command to get it:
char *url = strDupSize((char *)fRequestBuffer);
if(sscanf((char *)fRequestBuffer, "%*s %s", url) == 1) {
// Check for special command-specific parameters in a "Transport:" header:
Boolean reuseConnection, deliverViaTCP;
char *proxyURLSuffix = NULL;
parseTransportHeaderForREGISTER((const char *)fRequestBuffer, reuseConnection, deliverViaTCP, proxyURLSuffix);
ServerMediaSession *sms = fOurRTSPServer.lookupServerMediaSession(proxyURLSuffix);
if(sms && sms->envir().taskScheduler().getId() != IFLY_GetThreadId()) {
//在通信過程中,網絡發生重連,我們需要切換線程進行處理,本次切換下次生效
RTSP_INFO("ClientSocket[%d] Change SubThread[%ld->%ld] Handle\n",
fOurSocket, IFLY_GetThreadId(), sms->envir().taskScheduler().getId());
envir().taskScheduler().disableBackgroundHandling(fOurSocket);
sms->envir().taskScheduler().setBackgroundHandling(fOurSocket, SOCKET_READABLE | SOCKET_EXCEPTION,
incomingRequestHandler, this);
updateEnvir(sms->envir());
handleCmd_sessionNotFound();
} else {
handleCmd_REGISTER(cmdName, url, urlSuffix, (char const *)fRequestBuffer, reuseConnection, deliverViaTCP, proxyURLSuffix);
}
delete[] proxyURLSuffix;
} else {
handleCmd_bad();
}
delete[] url;
}
……………………
在RTSPClient.cpp中更新函數:
void RTSPClient::handleResponseBytes(int newBytesRead)
{
do {
if(newBytesRead >= 0 && (unsigned)newBytesRead < fResponseBufferBytesLeft) { break; } // data was read OK; process it below
if(newBytesRead >= (int)fResponseBufferBytesLeft) {
// We filled up our response buffer. Treat this as an error (for the first response handler):
envir().setResultMsg("RTSP response was truncated. Increase \"RTSPClient::responseBufferSize\"");
}
// An error occurred while reading our TCP socket. Call all pending response handlers, indicating this error.
// (However, the "RTSP response was truncated" error is applied to the first response handler only.)
resetResponseBuffer();
RequestRecord *request;
if(newBytesRead > 0) { // The "RTSP response was truncated" error
if((request = fRequestsAwaitingResponse.dequeue()) != NULL) {
handleRequestError(request);
delete request;
}
} else {
RequestQueue requestQueue(fRequestsAwaitingResponse);
resetTCPSockets(); // do this now, in case an error handler deletes "this"
while((request = requestQueue.dequeue()) != NULL) {
handleRequestError(request);
delete request;
}
if(fServerAddress == -1 && fOutputSocketNum < 0) {
RTSP_WARN("rtsp register link was disconnect!\n");
handleRegister("DEREGISTER", NULL, NULL, fOutputSocketNum);
}
}
return;
} while(0);
fResponseBufferBytesLeft -= newBytesRead;
fResponseBytesAlreadySeen += newBytesRead;
fResponseBuffer[fResponseBytesAlreadySeen] = '\0';
#ifdef DEBUG_RECEIVED
envir() << "Received " << newBytesRead << " new bytes of response data.\n";
#endif
unsigned numExtraBytesAfterResponse = 0;
Boolean responseSuccess = False; // by default
do {
// Data was read OK. Look through the data that we've read so far, to see if it contains <CR><LF><CR><LF>.
// (If not, wait for more data to arrive.)
Boolean endOfHeaders = False;
char const *ptr = fResponseBuffer;
if(fResponseBytesAlreadySeen > 3) {
char const *const ptrEnd = &fResponseBuffer[fResponseBytesAlreadySeen - 3];
while(ptr < ptrEnd) {
if(*ptr++ == '\r' && *ptr++ == '\n' && *ptr++ == '\r' && *ptr++ == '\n') {
// This is it
endOfHeaders = True;
break;
}
}
}
if(!endOfHeaders) { return; } // subsequent reads will be needed to get the complete response
// Now that we have the complete response headers (ending with <CR><LF><CR><LF>), parse them to get the response code, CSeq,
// and various other header parameters. To do this, we first make a copy of the received header data, because we'll be
// modifying it by adding '\0' bytes.
char *headerDataCopy;
unsigned responseCode = 200;
char const *responseStr = NULL;
RequestRecord *foundRequest = NULL;
char const *sessionParamsStr = NULL;
char const *transportParamsStr = NULL;
char const *scaleParamsStr = NULL;
char const *speedParamsStr = NULL;
char const *rangeParamsStr = NULL;
char const *rtpInfoParamsStr = NULL;
char const *wwwAuthenticateParamsStr = NULL;
char const *publicParamsStr = NULL;
char *bodyStart = NULL;
unsigned numBodyBytes = 0;
responseSuccess = False;
do {
headerDataCopy = new char[responseBufferSize];
strncpy(headerDataCopy, fResponseBuffer, fResponseBytesAlreadySeen);
headerDataCopy[fResponseBytesAlreadySeen] = '\0';
char *lineStart;
char *nextLineStart = headerDataCopy;
do {
lineStart = nextLineStart;
nextLineStart = getLine(lineStart);
} while(lineStart[0] == '\0' && nextLineStart != NULL); // skip over any blank lines at the start
if(!parseResponseCode(lineStart, responseCode, responseStr)) {
// This does not appear to be a RTSP response; perhaps it's a RTSP request instead?
handleIncomingRequest();
break; // we're done with this data
}
// Scan through the headers, handling the ones that we're interested in:
Boolean reachedEndOfHeaders;
unsigned cseq = 0;
unsigned contentLength = 0;
while(1) {
reachedEndOfHeaders = True; // by default; may get changed below
lineStart = nextLineStart;
if(lineStart == NULL) { break; }
nextLineStart = getLine(lineStart);
if(lineStart[0] == '\0') { break; } // this is a blank line
reachedEndOfHeaders = False;
char const *headerParamsStr;
if(checkForHeader(lineStart, "CSeq:", 5, headerParamsStr)) {
if(sscanf(headerParamsStr, "%u", &cseq) != 1 || cseq <= 0) {
envir().setResultMsg("Bad \"CSeq:\" header: \"", lineStart, "\"");
break;
}
// Find the handler function for "cseq":
RequestRecord *request;
while((request = fRequestsAwaitingResponse.dequeue()) != NULL) {
if(request->cseq() < cseq) { // assumes that the CSeq counter will never wrap around
// We never received (and will never receive) a response for this handler, so delete it:
if(fVerbosityLevel >= 1 && strcmp(request->commandName(), "POST") != 0) {
envir() << "WARNING: The server did not respond to our \"" << request->commandName() << "\" request (CSeq: "
<< request->cseq() << "). The server appears to be buggy (perhaps not handling pipelined requests properly).\n";
}
delete request;
} else if(request->cseq() == cseq) {
// This is the handler that we want. Remove its record, but remember it, so that we can later call its handler:
foundRequest = request;
break;
} else { // request->cseq() > cseq
// No handler was registered for this response, so ignore it.
break;
}
}
} else if(checkForHeader(lineStart, "Content-Length:", 15, headerParamsStr)) {
if(sscanf(headerParamsStr, "%u", &contentLength) != 1) {
envir().setResultMsg("Bad \"Content-Length:\" header: \"", lineStart, "\"");
break;
}
} else if(checkForHeader(lineStart, "Content-Base:", 13, headerParamsStr)) {
setBaseURL(headerParamsStr);
} else if(checkForHeader(lineStart, "Session:", 8, sessionParamsStr)) {
} else if(checkForHeader(lineStart, "Transport:", 10, transportParamsStr)) {
} else if(checkForHeader(lineStart, "Scale:", 6, scaleParamsStr)) {
} else if(checkForHeader(lineStart, "Speed:",
// NOTE: Should you feel the need to modify this code,
6,
// please first email the "live-devel" mailing list
speedParamsStr
// (see http://live555.com/liveMedia/faq.html#mailing-list-address for details),
)) {
// to check whether your proposed modification is appropriate/correct,
} else if(checkForHeader(lineStart, "Range:",
// and, if so, whether instead it could be included in
6,
// a future release of the "LIVE555 Streaming Media" software,
rangeParamsStr
// so that other projects that use the code could benefit (not just your own project).
)) {
} else if(checkForHeader(lineStart, "RTP-Info:", 9, rtpInfoParamsStr)) {
} else if(checkForHeader(lineStart, "WWW-Authenticate:", 17, headerParamsStr)) {
// If we've already seen a "WWW-Authenticate:" header, then we replace it with this new one only if
// the new one specifies "Digest" authentication:
if(wwwAuthenticateParamsStr == NULL || _strncasecmp(headerParamsStr, "Digest", 6) == 0) {
wwwAuthenticateParamsStr = headerParamsStr;
}
} else if(checkForHeader(lineStart, "Public:", 7, publicParamsStr)) {
} else if(checkForHeader(lineStart, "Allow:", 6, publicParamsStr)) {
// Note: we accept "Allow:" instead of "Public:", so that "OPTIONS" requests made to HTTP servers will work.
} else if(checkForHeader(lineStart, "Location:", 9, headerParamsStr)) {
setBaseURL(headerParamsStr);
} else if(checkForHeader(lineStart, "com.ses.streamID:", 17, headerParamsStr)) {
// Replace the tail of the 'base URL' with the value of this header parameter:
char *oldBaseURLTail = strrchr(fBaseURL, '/');
if(oldBaseURLTail != NULL) {
unsigned newBaseURLLen
= (oldBaseURLTail - fBaseURL) + 8 + strlen(headerParamsStr);
char *newBaseURL = new char[newBaseURLLen + 1];
// Note: We couldn't use "asprintf()", because some compilers don't support it
sprintf(newBaseURL, "%.*s/stream=%s",
(int)(oldBaseURLTail - fBaseURL), fBaseURL, headerParamsStr);
setBaseURL(newBaseURL);
delete[] newBaseURL;
}
} else if(checkForHeader(lineStart, "Connection:", 11, headerParamsStr)) {
if(fTunnelOverHTTPPortNum == 0 && _strncasecmp(headerParamsStr, "Close", 5) == 0) {
resetTCPSockets();
}
}
}
if(!reachedEndOfHeaders) { break; } // an error occurred
if(foundRequest == NULL) {
// Hack: The response didn't have a "CSeq:" header; assume it's for our most recent request:
if(!cseq) {
foundRequest = fRequestsAwaitingResponse.dequeue();
} else {
//recoved the other client' response
incomingDataHandler2(cseq, fResponseBuffer);
break;
}
}
// If we saw a "Content-Length:" header, then make sure that we have the amount of data that it specified:
unsigned bodyOffset = nextLineStart == NULL ? fResponseBytesAlreadySeen : nextLineStart - headerDataCopy;
bodyStart = &fResponseBuffer[bodyOffset];
numBodyBytes = fResponseBytesAlreadySeen - bodyOffset;
if(contentLength > numBodyBytes) {
// We need to read more data. First, make sure we have enough space for it:
unsigned numExtraBytesNeeded = contentLength - numBodyBytes;
unsigned remainingBufferSize = responseBufferSize - fResponseBytesAlreadySeen;
if(numExtraBytesNeeded > remainingBufferSize) {
char tmpBuf[200];
sprintf(tmpBuf, "Response buffer size (%d) is too small for \"Content-Length:\" %d (need a buffer size of >= %d bytes\n",
responseBufferSize, contentLength, fResponseBytesAlreadySeen + numExtraBytesNeeded);
envir().setResultMsg(tmpBuf);
break;
}
if(fVerbosityLevel >= 1) {
envir() << "Have received " << fResponseBytesAlreadySeen << " total bytes of a "
<< (foundRequest != NULL ? foundRequest->commandName() : "(unknown)")
<< " RTSP response; awaiting " << numExtraBytesNeeded << " bytes more.\n";
}
delete[] headerDataCopy;
if(foundRequest != NULL) { fRequestsAwaitingResponse.putAtHead(foundRequest); } // put our request record back; we need it again
return; // We need to read more data
}
// We now have a complete response (including all bytes specified by the "Content-Length:" header, if any).
char *responseEnd = bodyStart + contentLength;
numExtraBytesAfterResponse = &fResponseBuffer[fResponseBytesAlreadySeen] - responseEnd;
char saved = *responseEnd;
*responseEnd = '\0';
if(responseCode == 200 && foundRequest != NULL && strcmp(foundRequest->commandName(), "GET_PARAMETER") == 0) {
RTSP_DBUG("$$$$$$$$$$$$ProxyRtspClient[%ld] Received a complete %s response: \n%s",
IFLY_GetThreadId(), foundRequest != NULL ? foundRequest->commandName() : "(unknown)", fResponseBuffer);
} else {
RTSP_INFO("$$$$$$$$$$$$ProxyRtspClient[%ld] Received a complete %s response: \n%s",
IFLY_GetThreadId(), foundRequest != NULL ? foundRequest->commandName() : "(unknown)", fResponseBuffer);
}
if(numExtraBytesAfterResponse > 0) {
RTSP_INFO("(plus %d additional bytes)\n", numExtraBytesAfterResponse);
}
*responseEnd = saved;
if(foundRequest != NULL) {
Boolean needToResendCommand = False; // by default...
if(responseCode == 200) {
// Do special-case response handling for some commands:
if(strcmp(foundRequest->commandName(), "SETUP") == 0) {
if(!handleSETUPResponse(*foundRequest->subsession(), sessionParamsStr, transportParamsStr, foundRequest->booleanFlags() & 0x1)) { break; }
} else if(strcmp(foundRequest->commandName(), "PLAY") == 0) {
if(!handlePLAYResponse(foundRequest->session(), foundRequest->subsession(), scaleParamsStr, speedParamsStr, rangeParamsStr, rtpInfoParamsStr)) { break; }
} else if(strcmp(foundRequest->commandName(), "TEARDOWN") == 0) {
if(!handleTEARDOWNResponse(*foundRequest->session(), *foundRequest->subsession())) { break; }
} else if(strcmp(foundRequest->commandName(), "GET_PARAMETER") == 0) {
if(!handleGET_PARAMETERResponse(foundRequest->contentStr(), bodyStart, responseEnd)) { break; }
}
} else if(responseCode == 401 && handleAuthenticationFailure(wwwAuthenticateParamsStr)) {
// We need to resend the command, with an "Authorization:" header:
needToResendCommand = True;
if(strcmp(foundRequest->commandName(), "GET") == 0) {
// Note: If a HTTP "GET" command (for RTSP-over-HTTP tunneling) returns "401 Unauthorized", then we resend it
// (with an "Authorization:" header), just as we would for a RTSP command. However, we do so using a new TCP connection,
// because some servers close the original connection after returning the "401 Unauthorized".
resetTCPSockets(); // forces the opening of a new connection for the resent command
}
} else if(responseCode == 301 || responseCode == 302) { // redirection
resetTCPSockets(); // because we need to connect somewhere else next
needToResendCommand = True;
}
if(needToResendCommand) {
resetResponseBuffer();
(void)resendCommand(foundRequest);
delete[] headerDataCopy;
return; // without calling our response handler; the response to the resent command will do that
}
}
responseSuccess = True;
} while(0);
// If we have a handler function for this response, call it.
// But first, reset our response buffer, in case the handler goes to the event loop, and we end up getting called recursively:
if(numExtraBytesAfterResponse > 0) {
// An unusual case; usually due to having received pipelined responses. Move the extra bytes to the front of the buffer:
char *responseEnd = &fResponseBuffer[fResponseBytesAlreadySeen - numExtraBytesAfterResponse];
// But first: A hack to save a copy of the response 'body', in case it's needed below for "resultString":
numBodyBytes -= numExtraBytesAfterResponse;
if(numBodyBytes > 0) {
char saved = *responseEnd;
*responseEnd = '\0';
bodyStart = strDup(bodyStart);
*responseEnd = saved;
}
memmove(fResponseBuffer, responseEnd, numExtraBytesAfterResponse);
fResponseBytesAlreadySeen = numExtraBytesAfterResponse;
fResponseBufferBytesLeft = responseBufferSize - numExtraBytesAfterResponse;
fResponseBuffer[numExtraBytesAfterResponse] = '\0';
} else {
resetResponseBuffer();
}
if(foundRequest != NULL && foundRequest->handler() != NULL) {
int resultCode;
char *resultString;
if(responseSuccess) {
if(responseCode == 200) {
resultCode = 0;
resultString = numBodyBytes > 0 ? strDup(bodyStart) : strDup(publicParamsStr);
// Note: The "strDup(bodyStart)" call assumes that the body is encoded without interior '\0' bytes
} else {
resultCode = responseCode;
resultString = strDup(responseStr);
envir().setResultMsg(responseStr);
}
(*foundRequest->handler())(this, resultCode, resultString);
} else {
// An error occurred parsing the response, so call the handler, indicating an error:
handleRequestError(foundRequest);
}
}
delete foundRequest;
delete[] headerDataCopy;
if(numExtraBytesAfterResponse > 0 && numBodyBytes > 0) { delete[] bodyStart; }
} while(numExtraBytesAfterResponse > 0 && responseSuccess);
}
void RTSPClient::handleRequestError(RequestRecord *request)
{
int resultCode = -envir().getErrno();
if(resultCode == 0) {
// Choose some generic error code instead:
#if defined(__WIN32__) || defined(_WIN32) || defined(_QNX4)
resultCode = -WSAENOTCONN;
#else
resultCode = -ENOTCONN;
#endif
}
if(request->handler() != NULL) { (*request->handler())(this, resultCode, strDup(envir().getResultMsg())); }
}
void RTSPClient::handleIncomingRequest()
{
// Parse the request string into command name and 'CSeq', then 'handle' the command (by responding that we don't support it):
char cmdName[RTSP_PARAM_STRING_MAX] = {0};
char urlPreSuffix[RTSP_PARAM_STRING_MAX] = {0};
char urlSuffix[RTSP_PARAM_STRING_MAX] = {0};
char cseq[RTSP_PARAM_STRING_MAX] = {0};
char sessionId[RTSP_PARAM_STRING_MAX] = {0};
unsigned contentLength;
Boolean reuseConnection, deliverViaTCP;
char *proxyURLSuffix = NULL;
if(!parseRTSPRequestString(fResponseBuffer, fResponseBytesAlreadySeen,
cmdName, sizeof cmdName,
urlPreSuffix, sizeof urlPreSuffix,
urlSuffix, sizeof urlSuffix,
cseq, sizeof cseq,
sessionId, sizeof sessionId,
contentLength)) {
return;
} else {
RTSP_INFO("###RtspClient[%ld] received incoming RTSP request: \n%s\n", IFLY_GetThreadId(), fResponseBuffer);
char tmpBuf[2 * RTSP_PARAM_STRING_MAX] = {0};
if(strcmp(cmdName, "REGISTER") == 0) {
snprintf((char *)tmpBuf, sizeof tmpBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %s\r\n"
"%s\r\n",
cseq,
dateHeader());
parseTransportHeaderForREGISTER((const char *)fResponseBuffer, reuseConnection, deliverViaTCP, proxyURLSuffix);
if(proxyURLSuffix && sscanf((char *)fResponseBuffer, "%*s %s", urlSuffix) == 1) {
handleRegister(cmdName, proxyURLSuffix, urlSuffix, fOutputSocketNum);
}
} else if(strcmp(cmdName, "DEREGISTER") == 0) {
snprintf((char *)tmpBuf, sizeof tmpBuf,
"RTSP/1.0 200 OK\r\n"
"CSeq: %s\r\n"
"%s\r\n",
cseq,
dateHeader());
parseTransportHeaderForREGISTER((const char *)fResponseBuffer, reuseConnection, deliverViaTCP, proxyURLSuffix);
if(proxyURLSuffix && sscanf((char *)fResponseBuffer, "%*s %s", urlSuffix) == 1) {
handleRegister(cmdName, proxyURLSuffix, urlSuffix, fOutputSocketNum);
}
} else {
snprintf((char *)tmpBuf, sizeof tmpBuf,
"RTSP/1.0 405 Method Not Allowed\r\nCSeq: %s\r\n\r\n", cseq);
}
RTSP_INFO("###RtspClient[%ld] sending incoming RTSP response: \n%s\n", IFLY_GetThreadId(), tmpBuf);
send(fOutputSocketNum, tmpBuf, strlen(tmpBuf), 0);
delete[] proxyURLSuffix;
}
}
最重要的就是上面這些吧!其他可能還有一些地方就自己去調試吧!
至于上面這種方法是否合适你的應用場景,我就不得而知了。