天天看点

基于rtsp协议实现媒体资源发布或者视频直播

互联网直播业务,除私有协议外,大多采用rtmp协议进行媒体发布或推流,而很少选择使用rtsp协议推流和发布;而rtsp协议也是一个很好的协议,可以分别支持rtp over udp和 rtp over rtsp,媒体控制更加灵活。下面简单介绍一下如何采用rtsp协议进行推流或媒体资源发布。

1、rtsp支持标准的推流扩展,通过扩展announce消息后,可以使用rtspclient主动将媒体流发布到服务器,然后在服务器提供直播或点播服务,不过live555等大多开源项目默认并不支持该扩展;早年间,跟华为、上海贝尔等厂商对接过流媒体系统,使用过该扩展,客户端实现比较简单,这里就不多讲了;

2、基于rtsp协议实现链路复用,扩展register消息,将rtsp协议扩展成双向控制协议;

本文将会主要讲述上述第二种方式,实现rtsp链路复用,扩展支持rtsp流媒体发布的方法;基本步骤如下:

2.1、首先,客户端启动后,主动向rtsp服务器进行register注册,并携带需要发布的(1-n个)媒体资源信息;

2.2、其次,rtsp服务器收到register注册消息之后,将携带的媒体资源进行发布,并将该rtsp的tcp链路复用,作为服务器fd进行处理;

2.3、如果rtsp服务器接收到外部媒体资源访问或媒体流点播或直播,则复用之前注册过的rtsp链路,访问远端的客户端进行拉流;

总体来说,就是复用rtsp协议的tcp长连接,进行媒体资源发布、解决拉流时的NAT穿越问题。通信流程如下:

基于rtsp协议实现媒体资源发布或者视频直播

实际上客户端运行的是一个rtspServer,服务器端运行的是一个rtspProxy;双方可以始终保持这个rtsp的tcp长连接;根据需要随时点播或者直播客户端的视频流。而无需NAT穿越,rtp over udp传输也没有NAT。

下面上代码,live555中有一个register和unregitser流程,如果不能复用rtsp链接的话,这个流程用处不大,还不如私有协议更方便;所以下面的代码就是将这个流程修改成可以满足上述要求的rtsp链路复用的业务处理逻辑。

/RTSPRegisterSender.cpp

#include "RTSPRegisterSender.hh"

#include <GroupsockHelper.hh> // for MAKE_SOCKADDR_IN

// RTSPRegisterOrDeregisterSender implementation /

RTSPRegisterOrDeregisterSender::RTSPRegisterOrDeregisterSender(UsageEnvironment &env,

                                                               char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum,

                                                               Authenticator *authenticator,

                                                               int verbosityLevel, char const *applicationName)

  : RTSPClient(env, NULL, verbosityLevel, applicationName, 0, -1),

    fRemoteClientPortNum(remoteClientPortNum)

{

  // Set up a connection to the remote client.  To do this, we create a fake "rtsp://" URL for it:

  char const *fakeRTSPURLFmt = "rtsp://%s:%u/";

  unsigned fakeRTSPURLSize = strlen(fakeRTSPURLFmt) + strlen(remoteClientNameOrAddress) + 5;

  char *fakeRTSPURL = new char[fakeRTSPURLSize];

  sprintf(fakeRTSPURL, fakeRTSPURLFmt, remoteClientNameOrAddress, remoteClientPortNum);

  setBaseURL(fakeRTSPURL);

  delete[] fakeRTSPURL;

  if(authenticator != NULL) { fCurrentAuthenticator = *authenticator; }

}

RTSPRegisterOrDeregisterSender::~RTSPRegisterOrDeregisterSender()

{

}

RTSPRegisterOrDeregisterSender::RequestRecord_REGISTER_or_DEREGISTER::RequestRecord_REGISTER_or_DEREGISTER(unsigned cseq, char const *cmdName,

    RTSPClient::responseHandler *rtspResponseHandler,

    char const *rtspURLToRegisterOrDeregister,

    char const *proxyURLSuffix)

  : RTSPClient::RequestRecord(cseq, cmdName, rtspResponseHandler),

    fRTSPURLToRegisterOrDeregister(strDup(rtspURLToRegisterOrDeregister)),

    fProxyURLSuffix(proxyURLSuffix ? strDup(proxyURLSuffix) : strDup(GetLocalMac()))

{

}

RTSPRegisterOrDeregisterSender::RequestRecord_REGISTER_or_DEREGISTER::~RequestRecord_REGISTER_or_DEREGISTER()

{

  delete[] fRTSPURLToRegisterOrDeregister;

  delete[] fProxyURLSuffix;

}

// RTSPRegisterSender implementation /

RTSPRegisterSender *RTSPRegisterSender::createNew(UsageEnvironment &env,

                                                  char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum,

                                                  char const *rtspURLToRegister, char const *proxyURLSuffix,

                                                  RTSPClient::responseHandler *rtspResponseHandler, Authenticator *authenticator,

                                                  Boolean requestStreamingViaTCP, Boolean reuseConnection,

                                                  int verbosityLevel, char const *applicationName)

{

  return new RTSPRegisterSender(env, remoteClientNameOrAddress, remoteClientPortNum, rtspURLToRegister,

                                rtspResponseHandler, authenticator,

                                requestStreamingViaTCP, proxyURLSuffix, reuseConnection,

                                verbosityLevel, applicationName);

}

///将注册的客户端rtsp链接以及fd作为rtsp服务器链接以及fd使用,复用tcp链路

void RTSPRegisterSender::defaultRegisterResponseHandler(RTSPClient *rtspClient, int resultCode, char *resultString)

{

  RTSPRegisterSender *client = (RTSPRegisterSender *)rtspClient;

  if(!client || resultCode) {

    fprintf(stderr, "response error: \n%s\n", resultString);

    return;

  }

  if(!client->fOurServer) {

    OutPacketBuffer::maxSize = THE_MAX_OUT_PACKET_LEN;

    portNumBits listenPort = client->getLocalListenPort() ? client->getLocalListenPort() : GetRandomRtpPorts();

    client->fOurServer = RTSPServer::createNew(client->envir(), listenPort);

  }

  client->fOurServer->setReuseConnectionRequestHandler(client);

}

void RTSPRegisterSender::sendRegisterCommand()

{

  // Send the "REGISTER" request:

  sendRequest(new RequestRecord_REGISTER(++fCSeq, defaultRegisterResponseHandler, NULL, True, False, NULL));

}

void RTSPRegisterSender::grabConnection(int &sock, struct sockaddr_in &remoteAddress)

{

  //sock = grabSocket();

  sock = socketNum();

  MAKE_SOCKADDR_IN(remoteAddr, fServerAddress, htons(fRemoteClientPortNum));

  remoteAddress = remoteAddr;

}

RTSPRegisterSender::RTSPRegisterSender(UsageEnvironment &env,

                                       char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum, char const *rtspURLToRegister,

                                       RTSPClient::responseHandler *rtspResponseHandler, Authenticator *authenticator,

                                       Boolean requestStreamingViaTCP, char const *proxyURLSuffix, Boolean reuseConnection,

                                       int verbosityLevel, char const *applicationName)

  : fOurServer(NULL), fLocalListenPortNum(0), RTSPRegisterOrDeregisterSender(env, remoteClientNameOrAddress, remoteClientPortNum,

                                                                             authenticator, verbosityLevel, applicationName)

{

  // Send the "REGISTER" request:

  sendRequest(new RequestRecord_REGISTER(++fCSeq, rtspResponseHandler,

                                         rtspURLToRegister, reuseConnection, requestStreamingViaTCP, proxyURLSuffix));

}

RTSPRegisterSender::~RTSPRegisterSender()

{

  Medium::close(fOurServer);

}

Boolean RTSPRegisterSender::setRequestFields(RequestRecord *request,

                                             char *&cmdURL, Boolean &cmdURLWasAllocated,

                                             char const *&protocolStr,

                                             char *&extraHeaders, Boolean &extraHeadersWereAllocated)

{

  if(strcmp(request->commandName(), "REGISTER") == 0) {

    RequestRecord_REGISTER *request_REGISTER = (RequestRecord_REGISTER *)request;

    if(request_REGISTER->rtspURLToRegister()) {

      setBaseURL(request_REGISTER->rtspURLToRegister());

    }

    cmdURL = (char *)url();

    cmdURLWasAllocated = False;

    // Generate the "Transport:" header that will contain our REGISTER-specific parameters.  This will be "extraHeaders".

    // First, generate the "proxy_url_suffix" parameter string, if any:

    char *proxyURLSuffixParameterStr;

    if(request_REGISTER->proxyURLSuffix() == NULL) {

      proxyURLSuffixParameterStr = strDup("");

    } else {

      char const *proxyURLSuffixParameterFmt = "; proxy_url_suffix=%s";

      unsigned proxyURLSuffixParameterSize = strlen(proxyURLSuffixParameterFmt)

                                             + strlen(request_REGISTER->proxyURLSuffix());

      proxyURLSuffixParameterStr = new char[proxyURLSuffixParameterSize];

      sprintf(proxyURLSuffixParameterStr, proxyURLSuffixParameterFmt, request_REGISTER->proxyURLSuffix());

    }

    char const *transportHeaderFmt = "Transport: %spreferred_delivery_protocol=%s%s\r\n";

    unsigned transportHeaderSize = strlen(transportHeaderFmt) + 100 + strlen(proxyURLSuffixParameterStr);

    char *transportHeaderStr = new char[transportHeaderSize];

    sprintf(transportHeaderStr, transportHeaderFmt,

            request_REGISTER->reuseConnection() ? "reuse_connection; " : "",

            request_REGISTER->requestStreamingViaTCP() ? "interleaved" : "udp",

            proxyURLSuffixParameterStr);

    delete[] proxyURLSuffixParameterStr;

    extraHeaders = transportHeaderStr;

    extraHeadersWereAllocated = True;

    return True;

  } else {

    return RTSPClient::setRequestFields(request, cmdURL, cmdURLWasAllocated, protocolStr, extraHeaders, extraHeadersWereAllocated);

  }

}

RTSPRegisterSender::RequestRecord_REGISTER::RequestRecord_REGISTER(unsigned cseq, RTSPClient::responseHandler *rtspResponseHandler,

                                                                   char const *rtspURLToRegister,

                                                                   Boolean reuseConnection, Boolean requestStreamingViaTCP, char const *proxyURLSuffix)

  : RTSPRegisterOrDeregisterSender::RequestRecord_REGISTER_or_DEREGISTER(cseq, "REGISTER", rtspResponseHandler, rtspURLToRegister, proxyURLSuffix),

    fReuseConnection(reuseConnection), fRequestStreamingViaTCP(requestStreamingViaTCP)

{

}

RTSPRegisterSender::RequestRecord_REGISTER::~RequestRecord_REGISTER()

{

}

// RTSPDeregisterSender implementation /

RTSPDeregisterSender *RTSPDeregisterSender::createNew(UsageEnvironment &env,

                                                      char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum, char const *rtspURLToDeregister,

                                                      RTSPClient::responseHandler *rtspResponseHandler, Authenticator *authenticator,

                                                      char const *proxyURLSuffix, int verbosityLevel, char const *applicationName)

{

  return new RTSPDeregisterSender(env, remoteClientNameOrAddress, remoteClientPortNum, rtspURLToDeregister,

                                  rtspResponseHandler, authenticator,

                                  proxyURLSuffix, verbosityLevel, applicationName);

}

void RTSPDeregisterSender::sendDeregisterCommand()

{

  // Send the "REGISTER" request:

  sendRequest(new RequestRecord_DEREGISTER(++fCSeq, RTSPRegisterSender::defaultRegisterResponseHandler, NULL, NULL));

}

RTSPDeregisterSender::RTSPDeregisterSender(UsageEnvironment &env,

                                           char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum, char const *rtspURLToDeregister,

                                           RTSPClient::responseHandler *rtspResponseHandler, Authenticator *authenticator,

                                           char const *proxyURLSuffix,

                                           int verbosityLevel, char const *applicationName)

  : RTSPRegisterOrDeregisterSender(env, remoteClientNameOrAddress, remoteClientPortNum, authenticator, verbosityLevel, applicationName)

{

  // Send the "DEREGISTER" request:

  sendRequest(new RequestRecord_DEREGISTER(++fCSeq, rtspResponseHandler, rtspURLToDeregister, proxyURLSuffix));

}

RTSPDeregisterSender::~RTSPDeregisterSender()

{

}

Boolean RTSPDeregisterSender::setRequestFields(RequestRecord *request,

                                               char *&cmdURL, Boolean &cmdURLWasAllocated,

                                               char const *&protocolStr,

                                               char *&extraHeaders, Boolean &extraHeadersWereAllocated)

{

  if(strcmp(request->commandName(), "DEREGISTER") == 0) {

    RequestRecord_DEREGISTER *request_DEREGISTER = (RequestRecord_DEREGISTER *)request;

    if(request_DEREGISTER->rtspURLToDeregister()) {

      setBaseURL(request_DEREGISTER->rtspURLToDeregister());

    }

    cmdURL = (char *)url();

    cmdURLWasAllocated = False;

    // Generate the "Transport:" header that will contain our DEREGISTER-specific parameters.  This will be "extraHeaders".

    // First, generate the "proxy_url_suffix" parameter string, if any:

    char *proxyURLSuffixParameterStr;

    if(request_DEREGISTER->proxyURLSuffix() == NULL) {

      proxyURLSuffixParameterStr = strDup("");

    } else {

      char const *proxyURLSuffixParameterFmt = "proxy_url_suffix=%s";

      unsigned proxyURLSuffixParameterSize = strlen(proxyURLSuffixParameterFmt)

                                             + strlen(request_DEREGISTER->proxyURLSuffix());

      proxyURLSuffixParameterStr = new char[proxyURLSuffixParameterSize];

      sprintf(proxyURLSuffixParameterStr, proxyURLSuffixParameterFmt, request_DEREGISTER->proxyURLSuffix());

    }

    char const *transportHeaderFmt = "Transport: %s\r\n";

    unsigned transportHeaderSize = strlen(transportHeaderFmt) + strlen(proxyURLSuffixParameterStr);

    char *transportHeaderStr = new char[transportHeaderSize];

    sprintf(transportHeaderStr, transportHeaderFmt,

            proxyURLSuffixParameterStr);

    delete[] proxyURLSuffixParameterStr;

    extraHeaders = transportHeaderStr;

    extraHeadersWereAllocated = True;

    return True;

  } else {

    return RTSPClient::setRequestFields(request, cmdURL, cmdURLWasAllocated, protocolStr, extraHeaders, extraHeadersWereAllocated);

  }

}

RTSPDeregisterSender::RequestRecord_DEREGISTER::RequestRecord_DEREGISTER(unsigned cseq, RTSPClient::responseHandler *rtspResponseHandler,

                                                                         char const *rtspURLToDeregister,

                                                                         char const *proxyURLSuffix)

  : RTSPRegisterOrDeregisterSender::RequestRecord_REGISTER_or_DEREGISTER(cseq, "DEREGISTER", rtspResponseHandler, rtspURLToDeregister, proxyURLSuffix)

{

}

RTSPDeregisterSender::RequestRecord_DEREGISTER::~RequestRecord_DEREGISTER()

{

}

//独立的rtsp注册线程,或rtsp复用后的服务器线程。

RTSPRegister *RTSPRegister::gRtspRegister = NULL;

RTSPRegister *RTSPRegister::createNew(char const *ip, portNumBits port, portNumBits extPort, char const *username,

                                      char const *password, char const *streamId)

{

  RTSPRegister *pThis = gRtspRegister;

  Authenticator *authenticator = NULL;

  if(!pThis && ip && port) {

    if(username && password) {

      authenticator = new Authenticator(username, password);

    }

    pThis = new RTSPRegister(ip, port, extPort, authenticator, streamId);

    pThis->fScheduler = BasicTaskScheduler::createNew();

    pThis->fEnv = BasicUsageEnvironment::createNew(*pThis->fScheduler);

    pThis->fRegisterClient = RTSPRegisterSender::createNew(*pThis->fEnv, ip, port, NULL, streamId,

                                                           RTSPRegisterSender::defaultRegisterResponseHandler,

                                                           authenticator);

    pThis->fRegisterClient->setLocalListenPort(extPort);

    pThis->fScheduler->rescheduleDelayedTask(pThis->fHeartbeatTask, THE_DEFAULT_HEARTBEAT_INTERVAL * 1000, registerStream, pThis);

    IFLY_CreateThread(runTaskScheduler, pThis);

    gRtspRegister = pThis;

  }

  return pThis;

}

void RTSPRegister::registerStream(void *clientdata)

{

  static int errSendCount = 0;

  RTSPRegister *pThis = (RTSPRegister *)clientdata;

  if(pThis && pThis->fRegisterClient) {

    pThis->fRegisterClient->sendRegisterCommand();

    if(pThis->fEnv && strstr(pThis->fEnv->getResultMsg(), "REGISTER")) {

      ++errSendCount;

      if(errSendCount >= THE_DEFAULT_HEARTBEAT_TIMES) {

        Medium::close(pThis->fRegisterClient);

        pThis->fRegisterClient = RTSPRegisterSender::createNew(*pThis->fEnv, pThis->fIp, pThis->fPort, NULL, pThis->fStreamId,

                                                               RTSPRegisterSender::defaultRegisterResponseHandler,

                                                               pThis->fAuthenticator);

        pThis->fRegisterClient->setLocalListenPort(pThis->fExtPort);

        pThis->fScheduler->rescheduleDelayedTask(pThis->fHeartbeatTask, THE_DEFAULT_HEARTBEAT_INTERVAL * 1000, registerStream, pThis);

      } else {

        pThis->fScheduler->rescheduleDelayedTask(pThis->fHeartbeatTask, 3 * MILLION, registerStream, pThis);

      }

    } else {

      pThis->fScheduler->rescheduleDelayedTask(pThis->fHeartbeatTask, THE_DEFAULT_HEARTBEAT_INTERVAL * 1000, registerStream, pThis);

      errSendCount = 0;

    }

  }

}

RTSPRegister::RTSPRegister(char const *ip, portNumBits port, portNumBits extPort, Authenticator *authenticator, char const *streamId)

  : fIp(strDup(ip)), fPort(port), fExtPort(extPort), fAuthenticator(authenticator), fRun(0), fHeartbeatTask(NULL), fStreamId(strDup(streamId))

{

}

RTSPRegister::~RTSPRegister()

{

  delete [] fIp;

  delete [] fStreamId;

  delete fAuthenticator;

}

THREAD_RTN RTSPRegister::runTaskScheduler(void *lpParamt)

{

  RTSPRegister *pThis = (RTSPRegister *)lpParamt;

  RTSP_INFO("*************create register task scheduler thread: %ld\n", IFLY_GetThreadId());

  pThis->fEnv->taskScheduler().setId(IFLY_GetThreadId());

  pThis->fEnv->taskScheduler().doEventLoop(&pThis->fRun);

  Medium::close(pThis->fRegisterClient);

  pThis->fRegisterClient = NULL;

  pThis->fEnv->reclaim();

  pThis->fEnv = NULL;

  delete pThis->fScheduler;

  pThis->fScheduler = NULL;

  delete pThis;

  gRtspRegister = NULL;

  RTSP_INFO("*************delete register task scheduler thread: %ld\n", IFLY_GetThreadId());

  return 0;

}

void RTSPRegister::shutdown()

{

  RTSPRegister *pThis = createNew();

  if(pThis) {

    pThis->fScheduler->unscheduleDelayedTask(pThis->fHeartbeatTask);

    pThis->fRun = 1;

  }

}

//RTSPServerRegister.cpp

#include "RTSPServer.hh"

#include "RTSPCommon.hh"

#include "RTSPRegisterSender.hh"

#include "ProxyServerMediaSession.hh"

#include "GroupsockHelper.hh"

// Implementation of "RTSPServer::registerStream()": //

static void rtspRegisterResponseHandler(RTSPClient *rtspClient, int resultCode, char *resultString);  // forward

// A class that represents the state of a "REGISTER" request in progress:

class RegisterRequestRecord: public RTSPRegisterSender

{

public:

  RegisterRequestRecord(RTSPServer &ourServer, unsigned requestId,

                        char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum, char const *rtspURLToRegister,

                        RTSPServer::responseHandlerForREGISTER *responseHandler, Authenticator *authenticator,

                        Boolean requestStreamingViaTCP, char const *proxyURLSuffix)

    : RTSPRegisterSender(ourServer.envir(), remoteClientNameOrAddress, remoteClientPortNum, rtspURLToRegister,

                         rtspRegisterResponseHandler, authenticator,

                         requestStreamingViaTCP, proxyURLSuffix, True,

#ifdef DEBUG

                         1,

#else

                         0,

#endif

                         NULL),

      fOurServer(ourServer), fRequestId(requestId), fResponseHandler(responseHandler)

  {

    // Add ourself to our server's 'pending REGISTER or DEREGISTER requests' table:

    ourServer.fPendingRegisterOrDeregisterRequests->Add((char const *)this, this);

  }

  virtual ~RegisterRequestRecord()

  {

    // Remove ourself from the server's 'pending REGISTER or DEREGISTER requests' hash table before we go:

    fOurServer.fPendingRegisterOrDeregisterRequests->Remove((char const *)this);

  }

  void handleResponse(int resultCode, char *resultString)

  {

    if(resultCode == 0) {

      // The "REGISTER" request succeeded, so use the still-open RTSP socket to await incoming commands from the remote endpoint:

      int sock;

      struct sockaddr_in remoteAddress;

      grabConnection(sock, remoteAddress);

      if(sock >= 0) {

        increaseSendBufferTo(envir(), sock, 50 * 1024);  // in anticipation of streaming over it

        (void)fOurServer.createNewClientConnection(sock, remoteAddress);

      }

    }

    if(fResponseHandler != NULL) {

      // Call our (REGISTER-specific) response handler now:

      (*fResponseHandler)(&fOurServer, fRequestId, resultCode, resultString);

    } else {

      // We need to delete[] "resultString" before we leave:

      delete[] resultString;

    }

    // We're completely done with the REGISTER command now, so delete ourself now:

    delete this;

  }

private:

  RTSPServer &fOurServer;

  unsigned fRequestId;

  RTSPServer::responseHandlerForREGISTER *fResponseHandler;

};

static void rtspRegisterResponseHandler(RTSPClient *rtspClient, int resultCode, char *resultString)

{

  RegisterRequestRecord *registerRequestRecord = (RegisterRequestRecord *)rtspClient;

  registerRequestRecord->handleResponse(resultCode, resultString);

}

unsigned RTSPServer::registerStream(ServerMediaSession *serverMediaSession,

                                    char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum,

                                    responseHandlerForREGISTER *responseHandler,

                                    char const *username, char const *password,

                                    Boolean receiveOurStreamViaTCP, char const *proxyURLSuffix)

{

  // Create a new "RegisterRequestRecord" that will send the "REGISTER" command.

  // (This object will automatically get deleted after we get a response to the "REGISTER" command, or if we're deleted.)

  Authenticator *authenticator = NULL;

  if(username != NULL) {

    if(password == NULL) { password = ""; }

    authenticator = new Authenticator(username, password);

  }

  unsigned requestId = ++fRegisterOrDeregisterRequestCounter;

  new RegisterRequestRecord(*this, requestId,

                            remoteClientNameOrAddress, remoteClientPortNum, rtspURL(serverMediaSession),

                            responseHandler, authenticator,

                            receiveOurStreamViaTCP, proxyURLSuffix);

  delete authenticator; // we can do this here because it was copied to the "RegisterRequestRecord"

  return requestId;

}

// Implementation of "RTSPServer::deregisterStream()": //

static void rtspDeregisterResponseHandler(RTSPClient *rtspClient, int resultCode, char *resultString);  // forward

// A class that represents the state of a "DEREGISTER" request in progress:

class DeregisterRequestRecord: public RTSPDeregisterSender

{

public:

  DeregisterRequestRecord(RTSPServer &ourServer, unsigned requestId,

                          char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum, char const *rtspURLToDeregister,

                          RTSPServer::responseHandlerForDEREGISTER *responseHandler, Authenticator *authenticator,

                          char const *proxyURLSuffix)

    : RTSPDeregisterSender(ourServer.envir(), remoteClientNameOrAddress, remoteClientPortNum, rtspURLToDeregister,

                           rtspDeregisterResponseHandler, authenticator, proxyURLSuffix,

#ifdef DEBUG

                           1,

#else

                           0,

#endif

                           NULL),

      fOurServer(ourServer), fRequestId(requestId), fResponseHandler(responseHandler)

  {

    // Add ourself to our server's 'pending REGISTER or DEREGISTER requests' table:

    ourServer.fPendingRegisterOrDeregisterRequests->Add((char const *)this, this);

  }

  virtual ~DeregisterRequestRecord()

  {

    // Remove ourself from the server's 'pending REGISTER or DEREGISTER requests' hash table before we go:

    fOurServer.fPendingRegisterOrDeregisterRequests->Remove((char const *)this);

  }

  void handleResponse(int resultCode, char *resultString)

  {

    if(fResponseHandler != NULL) {

      // Call our (DEREGISTER-specific) response handler now:

      (*fResponseHandler)(&fOurServer, fRequestId, resultCode, resultString);

    } else {

      // We need to delete[] "resultString" before we leave:

      delete[] resultString;

    }

    // We're completely done with the DEREGISTER command now, so delete ourself now:

    delete this;

  }

private:

  RTSPServer &fOurServer;

  unsigned fRequestId;

  RTSPServer::responseHandlerForDEREGISTER *fResponseHandler;

};

static void rtspDeregisterResponseHandler(RTSPClient *rtspClient, int resultCode, char *resultString)

{

  DeregisterRequestRecord *deregisterRequestRecord = (DeregisterRequestRecord *)rtspClient;

  deregisterRequestRecord->handleResponse(resultCode, resultString);

}

unsigned RTSPServer::deregisterStream(ServerMediaSession *serverMediaSession,

                                      char const *remoteClientNameOrAddress, portNumBits remoteClientPortNum,

                                      responseHandlerForDEREGISTER *responseHandler,

                                      char const *username, char const *password,

                                      char const *proxyURLSuffix)

{

  // Create a new "DeregisterRequestRecord" that will send the "DEREGISTER" command.

  // (This object will automatically get deleted after we get a response to the "DEREGISTER" command, or if we're deleted.)

  Authenticator *authenticator = NULL;

  if(username != NULL) {

    if(password == NULL) { password = ""; }

    authenticator = new Authenticator(username, password);

  }

  unsigned requestId = ++fRegisterOrDeregisterRequestCounter;

  new DeregisterRequestRecord(*this, requestId,

                              remoteClientNameOrAddress, remoteClientPortNum, rtspURL(serverMediaSession),

                              responseHandler, authenticator,

                              proxyURLSuffix);

  delete authenticator; // we can do this here because it was copied to the "DeregisterRequestRecord"

  return requestId;

}

Boolean RTSPServer::weImplementREGISTER(char const * ,

                                        char const * , char *&responseStr)

{

  // By default, servers do not implement our custom "REGISTER"/"DEREGISTER" commands:

  responseStr = NULL;

  return False;

}

void RTSPServer::implementCmd_REGISTER(char const * ,

                                       char const * , char const * , int ,

                                       Boolean , char const * )

{

  // By default, this function is a 'noop'

}

// Special mechanism for handling our custom "REGISTER" command:

RTSPServer::RTSPClientConnection::ParamsForREGISTER::ParamsForREGISTER(char const *cmd,

                                                                       RTSPServer::RTSPClientConnection *ourConnection, char const *url, char const *urlSuffix,

                                                                       Boolean reuseConnection, Boolean deliverViaTCP, char const *proxyURLSuffix)

  : fCmd(strDup(cmd)), fOurConnection(ourConnection), fURL(strDup(url)), fURLSuffix(strDup(urlSuffix)),

    fReuseConnection(reuseConnection), fDeliverViaTCP(deliverViaTCP), fProxyURLSuffix(strDup(proxyURLSuffix))

{

}

RTSPServer::RTSPClientConnection::ParamsForREGISTER::~ParamsForREGISTER()

{

  delete[](char *)fCmd;

  delete[] fURL;

  delete[] fURLSuffix;

  delete[] fProxyURLSuffix;

}

#define DELAY_USECS_AFTER_REGISTER_RESPONSE 100000

void RTSPServer::RTSPClientConnection::handleCmd_REGISTER(char const *cmd,

                                                          char const *url, char const *urlSuffix, char const *fullRequestStr,

                                                          Boolean reuseConnection, Boolean deliverViaTCP, char const *proxyURLSuffix)

{

  char *responseStr;

  if(fOurRTSPServer.weImplementREGISTER(cmd, proxyURLSuffix, responseStr)) {

    // The "REGISTER"/"DEREGISTER" command - if we implement it - may require access control:

    if(!authenticationOK(cmd, urlSuffix, fullRequestStr)) { return; }

    // We implement the "REGISTER"/"DEREGISTER" command by first replying to it, then actually

    // handling it (in a separate event-loop task, that will get called after the reply has

    // been done).

    // Hack: If we're going to reuse the command's connection for subsequent RTSP commands, then we

    // delay the actual handling of the command slightly, to make it less likely that the first

    // subsequent RTSP command (e.g., "DESCRIBE") will end up in the client's reponse buffer before

    // the socket (at the far end) gets reused for RTSP command handling.

    setRTSPResponse(responseStr == NULL ? "200 OK" : responseStr);

    delete[] responseStr;

    ++fRecursionCount;

    ParamsForREGISTER *registerParams = new ParamsForREGISTER(cmd, this, url, urlSuffix, reuseConnection, deliverViaTCP, proxyURLSuffix);

    envir().taskScheduler().scheduleDelayedTask(DELAY_USECS_AFTER_REGISTER_RESPONSE, (TaskFunc *)continueHandlingREGISTER, registerParams);

  } else if(responseStr != NULL) {

    setRTSPResponse(responseStr);

    delete[] responseStr;

    ++fRecursionCount;

    ParamsForREGISTER *registerParams = new ParamsForREGISTER("DEREGISTER", this, url, urlSuffix, reuseConnection, deliverViaTCP, proxyURLSuffix);

    envir().taskScheduler().scheduleDelayedTask(DELAY_USECS_AFTER_REGISTER_RESPONSE, (TaskFunc *)continueHandlingREGISTER, registerParams);

  } else {

    handleCmd_notSupported();

  }

}

void RTSPServer::RTSPClientConnection::continueHandlingREGISTER(ParamsForREGISTER *params)

{

  params->fOurConnection->continueHandlingREGISTER1(params);

}

void RTSPServer::RTSPClientConnection::continueHandlingREGISTER1(ParamsForREGISTER *params)

{

  // Reuse our socket if requested:

  int socketNumToBackEndServer = params->fReuseConnection ? fClientOutputSocket : -1;

  RTSPServer *ourServer = &fOurRTSPServer; // copy the pointer now, in case we "delete this" below

  ourServer->implementCmd_REGISTER(params->fCmd,

                                   params->fURL, params->fURLSuffix, socketNumToBackEndServer,

                                   params->fDeliverViaTCP, params->fProxyURLSuffix);

  delete params;

  if(socketNumToBackEndServer >= 0) {

    // Because our socket will no longer be used by the server to handle incoming requests, we can now delete this

    // "RTSPClientConnection" object.  We do this now, in case the "implementCmd_REGISTER()" call below would also end up

    // deleting this.

    fClientInputSocket = fClientOutputSocket = -1; // so the socket doesn't get closed when we get deleted

    RTSP_INFO("Switch server connection into client connection, not real release fd[%d].\n", socketNumToBackEndServer);

    delete this;

  } else {

    --fRecursionCount;

  }

}

/ RTSPServerWithREGISTERProxying implementation /

RTSPServerWithREGISTERProxying *RTSPServerWithREGISTERProxying::createNew(UsageEnvironment &env, Port ourPort,

                                                                          UserAuthenticationDatabase *authDatabase, UserAuthenticationDatabase *authDatabaseForREGISTER,

                                                                          unsigned reclamationSeconds, unsigned subThreadNum,

                                                                          Boolean streamRTPOverTCP, int verbosityLevelForProxying, int streamOutType,

                                                                          char const *backEndUsername, char const *backEndPassword)

{

  if(fSelf) {

    RTSP_WARN("RTSPServer already created in thread: %d\n", fSelf->envir().taskScheduler().getId());

  }

  int ourSocket = setUpOurSocket(env, ourPort);

  if(ourSocket == -1) {

    return NULL;

  }

  fSelf = new RTSPServerWithREGISTERProxying(env, ourSocket, ourPort,

                                             authDatabase, authDatabaseForREGISTER,

                                             reclamationSeconds, subThreadNum,

                                             streamRTPOverTCP, verbosityLevelForProxying,

                                             backEndUsername, backEndPassword);

  fSelf->streamOutType = streamOutType;

  return (RTSPServerWithREGISTERProxying *)fSelf;

}

RTSPServerWithREGISTERProxying::RTSPServerWithREGISTERProxying(UsageEnvironment &env, int ourSocket, Port ourPort,

                                                               UserAuthenticationDatabase *authDatabase, UserAuthenticationDatabase *authDatabaseForREGISTER,

                                                               unsigned reclamationSeconds, unsigned subThreadNum,

                                                               Boolean streamRTPOverTCP, int verbosityLevelForProxying,

                                                               char const *backEndUsername, char const *backEndPassword)

  : RTSPServer(env, ourSocket, ourPort, authDatabase, reclamationSeconds, subThreadNum, streamRTPOverTCP, verbosityLevelForProxying),

    fRegisteredProxyCounter(0), fAllowedCommandNames(NULL), fAuthDBForREGISTER(authDatabaseForREGISTER),

    fBackEndUsername(strDup(backEndUsername)), fBackEndPassword(strDup(backEndPassword))

{

}

RTSPServerWithREGISTERProxying::~RTSPServerWithREGISTERProxying()

{

  delete[] fAllowedCommandNames;

  delete[] fBackEndUsername;

  delete[] fBackEndPassword;

}

char const *RTSPServerWithREGISTERProxying::allowedCommandNames()

{

  if(fAllowedCommandNames == NULL) {

    char const *baseAllowedCommandNames = RTSPServer::allowedCommandNames();

    char const *newAllowedCommandName = ", REGISTER, DEREGISTER";

    fAllowedCommandNames = new char[strlen(baseAllowedCommandNames) + strlen(newAllowedCommandName) + 1];

    sprintf(fAllowedCommandNames, "%s%s", baseAllowedCommandNames, newAllowedCommandName);

  }

  return fAllowedCommandNames;

}

Boolean RTSPServerWithREGISTERProxying::weImplementREGISTER(char const *cmd,

                                                            char const *proxyURLSuffix, char *&responseStr)

{

  // First, check whether we have already proxied a stream as "proxyURLSuffix":

  if(proxyURLSuffix != NULL) {

    ServerMediaSession *sms = lookupServerMediaSession(proxyURLSuffix);

    if((strcmp(cmd, "REGISTER") == 0 && sms != NULL) ||

        (strcmp(cmd, "DEREGISTER") == 0 && sms == NULL)) {

      responseStr = strDup("451 Invalid parameter");

      return False;

    }

  }

  // Otherwise, we will implement it:

  responseStr = NULL;

  return True;

}

void RTSPServerWithREGISTERProxying::implementCmd_REGISTER(char const *cmd,

                                                           char const *url, char const * , int socketToRemoteServer,

                                                           Boolean deliverViaTCP, char const *proxyURLSuffix)

{

  // Continue setting up proxying for the specified URL.

  // By default:

  //    - We use "registeredProxyStream-N" as the (front-end) stream name (ignoring the back-end stream's 'urlSuffix'),

  //      unless "proxyURLSuffix" is non-NULL (in which case we use that)

  //    - There is no 'username' and 'password' for the back-end stream.  (Thus, access-controlled back-end streams will fail.)

  //    - If "fStreamRTPOverTCP" is True, then we request delivery over TCP, regardless of the value of "deliverViaTCP".

  //      (Otherwise, if "fStreamRTPOverTCP" is False, we use the value of "deliverViaTCP" to decide this.)

  // To change this default behavior, you will need to subclass "RTSPServerWithREGISTERProxying", and reimplement this function.

  ProxyServerMediaSession *sms = NULL;

  char const *proxyStreamName = NULL;

  char proxyStreamNameBuf[100] = {0};

  struct sockaddr_in name = {0};

  int len = sizeof(struct sockaddr);

  if(proxyURLSuffix == NULL) {

    sprintf(proxyStreamNameBuf, "registeredProxyStream-%u", ++fRegisteredProxyCounter);

    proxyStreamName = proxyStreamNameBuf;

  } else {

    proxyStreamName = proxyURLSuffix;

  }

  sms = (ProxyServerMediaSession *)lookupServerMediaSession(proxyStreamName);

  if(strcmp(cmd, "REGISTER") == 0 && !sms) {

    portNumBits tunnelOverHTTPPortNum = getStreamRTPOverTCP() ? (portNumBits)(~0) : 0;

    // We don't support streaming from the back-end via RTSP/RTP/RTCP-over-HTTP; only via RTP/RTCP-over-TCP or RTP/RTCP-over-UDP

    sms = ProxyServerMediaSession::createNew(envir(), this, NULL, proxyStreamName, fBackEndUsername, fBackEndPassword,

                                             tunnelOverHTTPPortNum, getVerbosityLevel(), socketToRemoteServer);

    sms->incrementReferenceCount();

    addServerMediaSession(sms);

    // (Regardless of the verbosity level) announce the fact that we're proxying this new stream, and the URL to use to access it:

    if(sms->fProxyRTSPClient && !getpeername(sms->fProxyRTSPClient->socketNum(), (struct sockaddr *)&name, (socklen_t *)&len)) {

      char *urlPrefix = rtspURLPrefix(sms->fProxyRTSPClient->socketNum());

      RTSP_INFO("Proxying the registered rtspserver: rtsp://%s/streamId/real, by the url: %sstreamId/real?proxyid=%s\n",

                inet_ntoa(name.sin_addr), urlPrefix, sms->streamName());

      delete[] urlPrefix;

    }

  } else if(strcmp(cmd, "DEREGISTER") == 0 && sms) { // "DEREGISTER"

    sms->decrementReferenceCount();

    deleteServerMediaSession(sms);

    envir().taskScheduler().disableBackgroundHandling(socketToRemoteServer);

    ::closeSocket(socketToRemoteServer);

    RTSP_INFO("closed socket[%d]\n", socketToRemoteServer);

    GenericMediaServer::ServerMediaSessionIterator *iter = new GenericMediaServer::ServerMediaSessionIterator(*this);

    while((sms = (ProxyServerMediaSession *)iter->next()) != NULL) {

      if(sms->fProxyRTSPClient && !sms->fProxyRTSPClient->url()

          && !getpeername(sms->fProxyRTSPClient->socketNum(), (struct sockaddr *)&name, (socklen_t *)&len)) {

        char *urlPrefix = rtspURLPrefix(sms->fProxyRTSPClient->socketNum());

        RTSP_INFO("Proxying the registered rtspserver: rtsp://%s/streamId/real, by the url: %sstreamId/real?proxyid=%s\n",

                  inet_ntoa(name.sin_addr), urlPrefix, sms->streamName());

        delete[] urlPrefix;

      }

    }

    delete iter;

  }

}

UserAuthenticationDatabase *RTSPServerWithREGISTERProxying::getAuthenticationDatabaseForCommand(char const *cmdName)

{

  if(strcmp(cmdName, "REGISTER") == 0) { return fAuthDBForREGISTER; }

  return RTSPServer::getAuthenticationDatabaseForCommand(cmdName);

}

/main函数中调用:

  if(needAuth) {

    ourAuthenticator = new Authenticator(username, password);

  }

  RTSPDeregisterSender::createNew(*env, rtspIp, rtspPort, url,

                                  RTSPRegisterSender::defaultRegisterResponseHandler,

                                  ourAuthenticator, streamId, verbosityLevel, name);

  // We have the command-line arguments.  Send the command:

  RTSPRegisterSender::createNew(*env, rtspIp, rtspPort, url, streamId,

                                RTSPRegisterSender::defaultRegisterResponseHandler,

                                ourAuthenticator, streamRTPOverTCP, True,

                                verbosityLevel, name);

  // Note: This object will be deleted later, by the response handler

  env->taskScheduler().doEventLoop(); // does not return

以上是客户端代码,下面是几个服务器需要修改的地方:

在int RTSPServer::RTSPClientConnection::handleRequestBytes(int newBytesRead)中补充以下代码:

………………

      } else if(strcmp(cmdName, "TEARDOWN") == 0

                || strcmp(cmdName, "PLAY") == 0

                || strcmp(cmdName, "PAUSE") == 0

                || strcmp(cmdName, "GET_PARAMETER") == 0

                || strcmp(cmdName, "SET_PARAMETER") == 0) {

        if(clientSession != NULL) {

          clientSession->handleCmd_withinSession(this, cmdName, urlPreSuffix, urlSuffix, (char const *)fRequestBuffer);

        } else {

          handleCmd_sessionNotFound();

        }

      } else if(strcmp(cmdName, "REGISTER") == 0 || strcmp(cmdName, "DEREGISTER") == 0) {

        // Because - unlike other commands - an implementation of this command needs

        // the entire URL, we re-parse the command to get it:

        char *url = strDupSize((char *)fRequestBuffer);

        if(sscanf((char *)fRequestBuffer, "%*s %s", url) == 1) {

          // Check for special command-specific parameters in a "Transport:" header:

          Boolean reuseConnection, deliverViaTCP;

          char *proxyURLSuffix = NULL;

          parseTransportHeaderForREGISTER((const char *)fRequestBuffer, reuseConnection, deliverViaTCP, proxyURLSuffix);

          ServerMediaSession *sms = fOurRTSPServer.lookupServerMediaSession(proxyURLSuffix);

          if(sms && sms->envir().taskScheduler().getId() != IFLY_GetThreadId()) {

            //在通信过程中,网络发生重连,我们需要切换线程进行处理,本次切换下次生效

            RTSP_INFO("ClientSocket[%d] Change SubThread[%ld->%ld] Handle\n",

                      fOurSocket, IFLY_GetThreadId(), sms->envir().taskScheduler().getId());

            envir().taskScheduler().disableBackgroundHandling(fOurSocket);

            sms->envir().taskScheduler().setBackgroundHandling(fOurSocket, SOCKET_READABLE | SOCKET_EXCEPTION,

                                                               incomingRequestHandler, this);

            updateEnvir(sms->envir());

            handleCmd_sessionNotFound();

          } else {

            handleCmd_REGISTER(cmdName, url, urlSuffix, (char const *)fRequestBuffer, reuseConnection, deliverViaTCP, proxyURLSuffix);

          }

          delete[] proxyURLSuffix;

        } else {

          handleCmd_bad();

        }

        delete[] url;

      }

……………………

在RTSPClient.cpp中更新函数:

void RTSPClient::handleResponseBytes(int newBytesRead)

{

  do {

    if(newBytesRead >= 0 && (unsigned)newBytesRead < fResponseBufferBytesLeft) { break; }  // data was read OK; process it below

    if(newBytesRead >= (int)fResponseBufferBytesLeft) {

      // We filled up our response buffer.  Treat this as an error (for the first response handler):

      envir().setResultMsg("RTSP response was truncated. Increase \"RTSPClient::responseBufferSize\"");

    }

    // An error occurred while reading our TCP socket.  Call all pending response handlers, indicating this error.

    // (However, the "RTSP response was truncated" error is applied to the first response handler only.)

    resetResponseBuffer();

    RequestRecord *request;

    if(newBytesRead > 0) {  // The "RTSP response was truncated" error

      if((request = fRequestsAwaitingResponse.dequeue()) != NULL) {

        handleRequestError(request);

        delete request;

      }

    } else {

      RequestQueue requestQueue(fRequestsAwaitingResponse);

      resetTCPSockets(); // do this now, in case an error handler deletes "this"

      while((request = requestQueue.dequeue()) != NULL) {

        handleRequestError(request);

        delete request;

      }

      if(fServerAddress == -1 && fOutputSocketNum < 0) {

        RTSP_WARN("rtsp register link was disconnect!\n");

        handleRegister("DEREGISTER", NULL, NULL, fOutputSocketNum);

      }

    }

    return;

  } while(0);

  fResponseBufferBytesLeft -= newBytesRead;

  fResponseBytesAlreadySeen += newBytesRead;

  fResponseBuffer[fResponseBytesAlreadySeen] = '\0';

#ifdef DEBUG_RECEIVED

  envir() << "Received " << newBytesRead << " new bytes of response data.\n";

#endif

  unsigned numExtraBytesAfterResponse = 0;

  Boolean responseSuccess = False; // by default

  do {

    // Data was read OK.  Look through the data that we've read so far, to see if it contains <CR><LF><CR><LF>.

    // (If not, wait for more data to arrive.)

    Boolean endOfHeaders = False;

    char const *ptr = fResponseBuffer;

    if(fResponseBytesAlreadySeen > 3) {

      char const *const ptrEnd = &fResponseBuffer[fResponseBytesAlreadySeen - 3];

      while(ptr < ptrEnd) {

        if(*ptr++ == '\r' && *ptr++ == '\n' && *ptr++ == '\r' && *ptr++ == '\n') {

          // This is it

          endOfHeaders = True;

          break;

        }

      }

    }

    if(!endOfHeaders) { return; }  // subsequent reads will be needed to get the complete response

    // Now that we have the complete response headers (ending with <CR><LF><CR><LF>), parse them to get the response code, CSeq,

    // and various other header parameters.  To do this, we first make a copy of the received header data, because we'll be

    // modifying it by adding '\0' bytes.

    char *headerDataCopy;

    unsigned responseCode = 200;

    char const *responseStr = NULL;

    RequestRecord *foundRequest = NULL;

    char const *sessionParamsStr = NULL;

    char const *transportParamsStr = NULL;

    char const *scaleParamsStr = NULL;

    char const *speedParamsStr = NULL;

    char const *rangeParamsStr = NULL;

    char const *rtpInfoParamsStr = NULL;

    char const *wwwAuthenticateParamsStr = NULL;

    char const *publicParamsStr = NULL;

    char *bodyStart = NULL;

    unsigned numBodyBytes = 0;

    responseSuccess = False;

    do {

      headerDataCopy = new char[responseBufferSize];

      strncpy(headerDataCopy, fResponseBuffer, fResponseBytesAlreadySeen);

      headerDataCopy[fResponseBytesAlreadySeen] = '\0';

      char *lineStart;

      char *nextLineStart = headerDataCopy;

      do {

        lineStart = nextLineStart;

        nextLineStart = getLine(lineStart);

      } while(lineStart[0] == '\0' && nextLineStart != NULL);  // skip over any blank lines at the start

      if(!parseResponseCode(lineStart, responseCode, responseStr)) {

        // This does not appear to be a RTSP response; perhaps it's a RTSP request instead?

        handleIncomingRequest();

        break; // we're done with this data

      }

      // Scan through the headers, handling the ones that we're interested in:

      Boolean reachedEndOfHeaders;

      unsigned cseq = 0;

      unsigned contentLength = 0;

      while(1) {

        reachedEndOfHeaders = True; // by default; may get changed below

        lineStart = nextLineStart;

        if(lineStart == NULL) { break; }

        nextLineStart = getLine(lineStart);

        if(lineStart[0] == '\0') { break; }  // this is a blank line

        reachedEndOfHeaders = False;

        char const *headerParamsStr;

        if(checkForHeader(lineStart, "CSeq:", 5, headerParamsStr)) {

          if(sscanf(headerParamsStr, "%u", &cseq) != 1 || cseq <= 0) {

            envir().setResultMsg("Bad \"CSeq:\" header: \"", lineStart, "\"");

            break;

          }

          // Find the handler function for "cseq":

          RequestRecord *request;

          while((request = fRequestsAwaitingResponse.dequeue()) != NULL) {

            if(request->cseq() < cseq) {  // assumes that the CSeq counter will never wrap around

              // We never received (and will never receive) a response for this handler, so delete it:

              if(fVerbosityLevel >= 1 && strcmp(request->commandName(), "POST") != 0) {

                envir() << "WARNING: The server did not respond to our \"" << request->commandName() << "\" request (CSeq: "

                        << request->cseq() << ").  The server appears to be buggy (perhaps not handling pipelined requests properly).\n";

              }

              delete request;

            } else if(request->cseq() == cseq) {

              // This is the handler that we want. Remove its record, but remember it, so that we can later call its handler:

              foundRequest = request;

              break;

            } else { // request->cseq() > cseq

              // No handler was registered for this response, so ignore it.

              break;

            }

          }

        } else if(checkForHeader(lineStart, "Content-Length:", 15, headerParamsStr)) {

          if(sscanf(headerParamsStr, "%u", &contentLength) != 1) {

            envir().setResultMsg("Bad \"Content-Length:\" header: \"", lineStart, "\"");

            break;

          }

        } else if(checkForHeader(lineStart, "Content-Base:", 13, headerParamsStr)) {

          setBaseURL(headerParamsStr);

        } else if(checkForHeader(lineStart, "Session:", 8, sessionParamsStr)) {

        } else if(checkForHeader(lineStart, "Transport:", 10, transportParamsStr)) {

        } else if(checkForHeader(lineStart, "Scale:", 6, scaleParamsStr)) {

        } else if(checkForHeader(lineStart, "Speed:",

                                 // NOTE: Should you feel the need to modify this code,

                                 6,

                                 // please first email the "live-devel" mailing list

                                 speedParamsStr

                                 // (see http://live555.com/liveMedia/faq.html#mailing-list-address for details),

                                )) {

          // to check whether your proposed modification is appropriate/correct,

        } else if(checkForHeader(lineStart, "Range:",

                                 // and, if so, whether instead it could be included in

                                 6,

                                 // a future release of the "LIVE555 Streaming Media" software,

                                 rangeParamsStr

                                 // so that other projects that use the code could benefit (not just your own project).

                                )) {

        } else if(checkForHeader(lineStart, "RTP-Info:", 9, rtpInfoParamsStr)) {

        } else if(checkForHeader(lineStart, "WWW-Authenticate:", 17, headerParamsStr)) {

          // If we've already seen a "WWW-Authenticate:" header, then we replace it with this new one only if

          // the new one specifies "Digest" authentication:

          if(wwwAuthenticateParamsStr == NULL || _strncasecmp(headerParamsStr, "Digest", 6) == 0) {

            wwwAuthenticateParamsStr = headerParamsStr;

          }

        } else if(checkForHeader(lineStart, "Public:", 7, publicParamsStr)) {

        } else if(checkForHeader(lineStart, "Allow:", 6, publicParamsStr)) {

          // Note: we accept "Allow:" instead of "Public:", so that "OPTIONS" requests made to HTTP servers will work.

        } else if(checkForHeader(lineStart, "Location:", 9, headerParamsStr)) {

          setBaseURL(headerParamsStr);

        } else if(checkForHeader(lineStart, "com.ses.streamID:", 17, headerParamsStr)) {

          // Replace the tail of the 'base URL' with the value of this header parameter:

          char *oldBaseURLTail = strrchr(fBaseURL, '/');

          if(oldBaseURLTail != NULL) {

            unsigned newBaseURLLen

              = (oldBaseURLTail - fBaseURL) + 8 + strlen(headerParamsStr);

            char *newBaseURL = new char[newBaseURLLen + 1];

            // Note: We couldn't use "asprintf()", because some compilers don't support it

            sprintf(newBaseURL, "%.*s/stream=%s",

                    (int)(oldBaseURLTail - fBaseURL), fBaseURL, headerParamsStr);

            setBaseURL(newBaseURL);

            delete[] newBaseURL;

          }

        } else if(checkForHeader(lineStart, "Connection:", 11, headerParamsStr)) {

          if(fTunnelOverHTTPPortNum == 0 && _strncasecmp(headerParamsStr, "Close", 5) == 0) {

            resetTCPSockets();

          }

        }

      }

      if(!reachedEndOfHeaders) { break; }  // an error occurred

      if(foundRequest == NULL) {

        // Hack: The response didn't have a "CSeq:" header; assume it's for our most recent request:

        if(!cseq) {

          foundRequest = fRequestsAwaitingResponse.dequeue();

        } else {

          //recoved the other client' response

          incomingDataHandler2(cseq, fResponseBuffer);

          break;

        }

      }

      // If we saw a "Content-Length:" header, then make sure that we have the amount of data that it specified:

      unsigned bodyOffset = nextLineStart == NULL ? fResponseBytesAlreadySeen : nextLineStart - headerDataCopy;

      bodyStart = &fResponseBuffer[bodyOffset];

      numBodyBytes = fResponseBytesAlreadySeen - bodyOffset;

      if(contentLength > numBodyBytes) {

        // We need to read more data.  First, make sure we have enough space for it:

        unsigned numExtraBytesNeeded = contentLength - numBodyBytes;

        unsigned remainingBufferSize = responseBufferSize - fResponseBytesAlreadySeen;

        if(numExtraBytesNeeded > remainingBufferSize) {

          char tmpBuf[200];

          sprintf(tmpBuf, "Response buffer size (%d) is too small for \"Content-Length:\" %d (need a buffer size of >= %d bytes\n",

                  responseBufferSize, contentLength, fResponseBytesAlreadySeen + numExtraBytesNeeded);

          envir().setResultMsg(tmpBuf);

          break;

        }

        if(fVerbosityLevel >= 1) {

          envir() << "Have received " << fResponseBytesAlreadySeen << " total bytes of a "

                  << (foundRequest != NULL ? foundRequest->commandName() : "(unknown)")

                  << " RTSP response; awaiting " << numExtraBytesNeeded << " bytes more.\n";

        }

        delete[] headerDataCopy;

        if(foundRequest != NULL) { fRequestsAwaitingResponse.putAtHead(foundRequest); }   // put our request record back; we need it again

        return; // We need to read more data

      }

      // We now have a complete response (including all bytes specified by the "Content-Length:" header, if any).

      char *responseEnd = bodyStart + contentLength;

      numExtraBytesAfterResponse = &fResponseBuffer[fResponseBytesAlreadySeen] - responseEnd;

      char saved = *responseEnd;

      *responseEnd = '\0';

      if(responseCode == 200 && foundRequest != NULL && strcmp(foundRequest->commandName(), "GET_PARAMETER") == 0) {

        RTSP_DBUG("$$$$$$$$$$$$ProxyRtspClient[%ld] Received a complete %s response: \n%s",

                  IFLY_GetThreadId(), foundRequest != NULL ? foundRequest->commandName() : "(unknown)", fResponseBuffer);

      } else {

        RTSP_INFO("$$$$$$$$$$$$ProxyRtspClient[%ld] Received a complete %s response: \n%s",

                  IFLY_GetThreadId(), foundRequest != NULL ? foundRequest->commandName() : "(unknown)", fResponseBuffer);

      }

      if(numExtraBytesAfterResponse > 0) {

        RTSP_INFO("(plus %d additional bytes)\n", numExtraBytesAfterResponse);

      }

      *responseEnd = saved;

      if(foundRequest != NULL) {

        Boolean needToResendCommand = False; // by default...

        if(responseCode == 200) {

          // Do special-case response handling for some commands:

          if(strcmp(foundRequest->commandName(), "SETUP") == 0) {

            if(!handleSETUPResponse(*foundRequest->subsession(), sessionParamsStr, transportParamsStr, foundRequest->booleanFlags() & 0x1)) { break; }

          } else if(strcmp(foundRequest->commandName(), "PLAY") == 0) {

            if(!handlePLAYResponse(foundRequest->session(), foundRequest->subsession(), scaleParamsStr, speedParamsStr, rangeParamsStr, rtpInfoParamsStr)) { break; }

          } else if(strcmp(foundRequest->commandName(), "TEARDOWN") == 0) {

            if(!handleTEARDOWNResponse(*foundRequest->session(), *foundRequest->subsession())) { break; }

          } else if(strcmp(foundRequest->commandName(), "GET_PARAMETER") == 0) {

            if(!handleGET_PARAMETERResponse(foundRequest->contentStr(), bodyStart, responseEnd)) { break; }

          }

        } else if(responseCode == 401 && handleAuthenticationFailure(wwwAuthenticateParamsStr)) {

          // We need to resend the command, with an "Authorization:" header:

          needToResendCommand = True;

          if(strcmp(foundRequest->commandName(), "GET") == 0) {

            // Note: If a HTTP "GET" command (for RTSP-over-HTTP tunneling) returns "401 Unauthorized", then we resend it

            // (with an "Authorization:" header), just as we would for a RTSP command.  However, we do so using a new TCP connection,

            // because some servers close the original connection after returning the "401 Unauthorized".

            resetTCPSockets(); // forces the opening of a new connection for the resent command

          }

        } else if(responseCode == 301 || responseCode == 302) {  // redirection

          resetTCPSockets(); // because we need to connect somewhere else next

          needToResendCommand = True;

        }

        if(needToResendCommand) {

          resetResponseBuffer();

          (void)resendCommand(foundRequest);

          delete[] headerDataCopy;

          return; // without calling our response handler; the response to the resent command will do that

        }

      }

      responseSuccess = True;

    } while(0);

    // If we have a handler function for this response, call it.

    // But first, reset our response buffer, in case the handler goes to the event loop, and we end up getting called recursively:

    if(numExtraBytesAfterResponse > 0) {

      // An unusual case; usually due to having received pipelined responses.  Move the extra bytes to the front of the buffer:

      char *responseEnd = &fResponseBuffer[fResponseBytesAlreadySeen - numExtraBytesAfterResponse];

      // But first: A hack to save a copy of the response 'body', in case it's needed below for "resultString":

      numBodyBytes -= numExtraBytesAfterResponse;

      if(numBodyBytes > 0) {

        char saved = *responseEnd;

        *responseEnd = '\0';

        bodyStart = strDup(bodyStart);

        *responseEnd = saved;

      }

      memmove(fResponseBuffer, responseEnd, numExtraBytesAfterResponse);

      fResponseBytesAlreadySeen = numExtraBytesAfterResponse;

      fResponseBufferBytesLeft = responseBufferSize - numExtraBytesAfterResponse;

      fResponseBuffer[numExtraBytesAfterResponse] = '\0';

    } else {

      resetResponseBuffer();

    }

    if(foundRequest != NULL && foundRequest->handler() != NULL) {

      int resultCode;

      char *resultString;

      if(responseSuccess) {

        if(responseCode == 200) {

          resultCode = 0;

          resultString = numBodyBytes > 0 ? strDup(bodyStart) : strDup(publicParamsStr);

          // Note: The "strDup(bodyStart)" call assumes that the body is encoded without interior '\0' bytes

        } else {

          resultCode = responseCode;

          resultString = strDup(responseStr);

          envir().setResultMsg(responseStr);

        }

        (*foundRequest->handler())(this, resultCode, resultString);

      } else {

        // An error occurred parsing the response, so call the handler, indicating an error:

        handleRequestError(foundRequest);

      }

    }

    delete foundRequest;

    delete[] headerDataCopy;

    if(numExtraBytesAfterResponse > 0 && numBodyBytes > 0) { delete[] bodyStart; }

  } while(numExtraBytesAfterResponse > 0 && responseSuccess);

}

void RTSPClient::handleRequestError(RequestRecord *request)

{

  int resultCode = -envir().getErrno();

  if(resultCode == 0) {

    // Choose some generic error code instead:

#if defined(__WIN32__) || defined(_WIN32) || defined(_QNX4)

    resultCode = -WSAENOTCONN;

#else

    resultCode = -ENOTCONN;

#endif

  }

  if(request->handler() != NULL) { (*request->handler())(this, resultCode, strDup(envir().getResultMsg())); }

}

void RTSPClient::handleIncomingRequest()

{

  // Parse the request string into command name and 'CSeq', then 'handle' the command (by responding that we don't support it):

  char cmdName[RTSP_PARAM_STRING_MAX] = {0};

  char urlPreSuffix[RTSP_PARAM_STRING_MAX] = {0};

  char urlSuffix[RTSP_PARAM_STRING_MAX] = {0};

  char cseq[RTSP_PARAM_STRING_MAX] = {0};

  char sessionId[RTSP_PARAM_STRING_MAX] = {0};

  unsigned contentLength;

  Boolean reuseConnection, deliverViaTCP;

  char *proxyURLSuffix = NULL;

  if(!parseRTSPRequestString(fResponseBuffer, fResponseBytesAlreadySeen,

                             cmdName, sizeof cmdName,

                             urlPreSuffix, sizeof urlPreSuffix,

                             urlSuffix, sizeof urlSuffix,

                             cseq, sizeof cseq,

                             sessionId, sizeof sessionId,

                             contentLength)) {

    return;

  } else {

    RTSP_INFO("###RtspClient[%ld] received incoming RTSP request: \n%s\n", IFLY_GetThreadId(), fResponseBuffer);

    char tmpBuf[2 * RTSP_PARAM_STRING_MAX] = {0};

    if(strcmp(cmdName, "REGISTER") == 0) {

      snprintf((char *)tmpBuf, sizeof tmpBuf,

               "RTSP/1.0 200 OK\r\n"

               "CSeq: %s\r\n"

               "%s\r\n",

               cseq,

               dateHeader());

      parseTransportHeaderForREGISTER((const char *)fResponseBuffer, reuseConnection, deliverViaTCP, proxyURLSuffix);

      if(proxyURLSuffix && sscanf((char *)fResponseBuffer, "%*s %s", urlSuffix) == 1) {

        handleRegister(cmdName, proxyURLSuffix, urlSuffix, fOutputSocketNum);

      }

    } else if(strcmp(cmdName, "DEREGISTER") == 0) {

      snprintf((char *)tmpBuf, sizeof tmpBuf,

               "RTSP/1.0 200 OK\r\n"

               "CSeq: %s\r\n"

               "%s\r\n",

               cseq,

               dateHeader());

      parseTransportHeaderForREGISTER((const char *)fResponseBuffer, reuseConnection, deliverViaTCP, proxyURLSuffix);

      if(proxyURLSuffix && sscanf((char *)fResponseBuffer, "%*s %s", urlSuffix) == 1) {

        handleRegister(cmdName, proxyURLSuffix, urlSuffix, fOutputSocketNum);

      }

    } else {

      snprintf((char *)tmpBuf, sizeof tmpBuf,

               "RTSP/1.0 405 Method Not Allowed\r\nCSeq: %s\r\n\r\n", cseq);

    }

    RTSP_INFO("###RtspClient[%ld] sending incoming RTSP response: \n%s\n", IFLY_GetThreadId(), tmpBuf);

    send(fOutputSocketNum, tmpBuf, strlen(tmpBuf), 0);

    delete[] proxyURLSuffix;

  }

}

最重要的就是上面这些吧!其他可能还有一些地方就自己去调试吧!

至于上面这种方法是否合适你的应用场景,我就不得而知了。

继续阅读