red5源碼分析—用戶端連接配接
本博文開始分析red5伺服器以及用戶端的源碼,選取的版本為最新的1.0.7。
red5發展到現在,可以相容很多流媒體傳輸協定,例如rtmp、rtmpt等等。本博文隻分析rtmp協定,其他協定看看以後有沒有時間研究吧。
red5的伺服器啟動有好幾種方式,standalone、tomcat、jetty等等。本博文隻分析standalone的啟動方式。本博文假設用戶端為red5 rtmpclient。
red5 client和server的下載下傳位址如下
https://github.com/Red5
首先看一段網上很常見的red5的用戶端代碼,如下所示
public class RtmpClientTest extends RTMPClient implements
INetStreamEventHandler, IPendingServiceCallback, IEventDispatcher {
private ConcurrentLinkedQueue<IMessage> frameBuffer = new ConcurrentLinkedQueue<IMessage>();
String host = "127.0.0.1";
String app = "red5test";
int port = ;
public RtmpClientTest() {
super();
Map<String, Object> map = makeDefaultConnectionParams(host,
, "red5test");
connect(host, , map, this);
}
@Override
public void dispatchEvent(IEvent arg0) {
}
@Override
public void resultReceived(IPendingServiceCall call) {
Object result = call.getResult();
if (result instanceof ObjectMap) {
if ("connect".equals(call.getServiceMethodName())) {
createStream(this);
}
} else {
if ("createStream".equals(call.getServiceMethodName())) {
if (result instanceof Integer) {
Integer streamIdInt = (Integer) result;
// int streamId = streamIdInt.intValue();
// publish(streamId, "testgio2", "live", this);
invoke("getRoomsInfo", this);
} else {
disconnect();
}
} else if ("getRoomsInfo".equals(call.getServiceMethodName())) {
ArrayList<String> list = (ArrayList<String>) result;
for (int i = ; i < list.size(); i++) {
System.out.println(list.get(i));
}
}
}
}
@Override
public void onStreamEvent(Notify arg0) {
ObjectMap<?, ?> map = (ObjectMap<?, ?>) notify.getCall().getArguments()[];
String code = (String) map.get("code");
if (StatusCodes.NS_PUBLISH_START.equals(code)) {
IMessage message = null;
while ((message = frameBuffer.poll()) != null) {
this.publishStreamData(streamId, message);
}
} else if (StatusCodes.NS_UNPUBLISHED_SUCCESS.equals(code)) {
}
}
@Override
public void connectionOpened(RTMPConnection conn, RTMP state) {
super.connectionOpened(conn, state);
}
public static void main(String[] args) {
new RtmpClientTest();
}
}
RtmpClientTest實作的三個接口INetStreamEventHandler、IPendingServiceCallback、IEventDispatcher和回調函數有關,後面的章節會分析到。
首先來看RtmpClientTest的構造函數,其父類RTMPClient的構造函數如下
public RTMPClient() {
ioHandler = new RTMPMinaIoHandler();
ioHandler.setHandler(this);
}
red5用戶端使用mina架構來封裝Java Nio,關于mina架構的源碼分析請檢視部落客的mina源碼分析系列文章。這裡有兩個handler,RTMPMinaIoHandler和mina架構有關,另一個handler就是RTMPClient自身,因為其繼承自BaseRTMPClientHandler,BaseRTMPClientHandler和業務有關。
回到RtmpClientTest構造函數,接下來調用connect進行連接配接,connect函數實作在RTMPClient的父類BaseRTMPClientHandler中,
public void connect(String server, int port, Map<String, Object> connectionParams, IPendingServiceCallback connectCallback) {
connect(server, port, connectionParams, connectCallback, null);
}
public void connect(String server, int port, Map<String, Object> connectionParams, IPendingServiceCallback connectCallback, Object[] connectCallArguments) {
this.connectionParams = connectionParams;
this.connectArguments = connectCallArguments;
if (!connectionParams.containsKey("objectEncoding")) {
connectionParams.put("objectEncoding", );
}
this.connectCallback = connectCallback;
startConnector(server, port);
}
connect函數一開始作了一些簡單的設定,最後通過startConnector與伺服器建立連接配接。startConnector定義在RTMPClient中,
protected void startConnector(String server, int port) {
socketConnector = new NioSocketConnector();
socketConnector.setHandler(ioHandler);
future = socketConnector.connect(new InetSocketAddress(server, port));
future.addListener(new IoFutureListener<ConnectFuture>() {
public void operationComplete(ConnectFuture future) {
try {
session = future.getSession();
} catch (Throwable e) {
socketConnector.dispose(false);
handleException(e);
}
}
});
future.awaitUninterruptibly(CONNECTOR_WORKER_TIMEOUT);
}
這裡主要構造了一個NioSocketConnector,并調用其connect函數。connect函數會使用mina架構與伺服器建立連接配接,下一章會分析伺服器如何處理用戶端的連接配接請求。當與伺服器建立完連接配接(TCP連接配接)時,根據mina架構的源碼,會回調mina架構中IoHandler的處理函數,也即前面注冊的RTMPMinaIoHandler的sessionCreated和sessionOpened函數,下面依次來看。
一. sessionCreated
sessionCreated的代碼如下,
public void sessionCreated(IoSession session) throws Exception {
session.getFilterChain().addFirst("rtmpeFilter", new RTMPEIoFilter());
RTMPMinaConnection conn = createRTMPMinaConnection();
conn.setIoSession(session);
session.setAttribute(RTMPConnection.RTMP_SESSION_ID, conn.getSessionId());
OutboundHandshake outgoingHandshake = new OutboundHandshake();
session.setAttribute(RTMPConnection.RTMP_HANDSHAKE, outgoingHandshake);
if (enableSwfVerification) {
String swfUrl = (String) handler.getConnectionParams().get("swfUrl");
if (!StringUtils.isEmpty(swfUrl)) {
outgoingHandshake.initSwfVerification(swfUrl);
}
}
session.setAttribute(RTMPConnection.RTMP_HANDLER, handler);
handler.setConnection((RTMPConnection) conn);
}
sessionCreated函數首先向mina架構中添加一個過濾器RTMPEIoFilter,該過濾器用來處理RTMP協定的握手過程,具體的RTMP協定可以從網上下載下傳。sessionCreated接着建立一個RTMPMinaConnection并進行相應的設定,
protected RTMPMinaConnection createRTMPMinaConnection() {
return (RTMPMinaConnection) RTMPConnManager.getInstance().createConnection(RTMPMinaConnection.class);
}
RTMPConnManager使用單例模式,其createConnection函數如下,
public RTMPConnection createConnection(Class<?> connCls) {
RTMPConnection conn = null;
if (RTMPConnection.class.isAssignableFrom(connCls)) {
try {
conn = createConnectionInstance(connCls);
connMap.put(conn.getSessionId(), conn);
} catch (Exception ex) {
}
}
return conn;
}
public RTMPConnection createConnectionInstance(Class<?> cls) throws Exception {
RTMPConnection conn = null;
if (cls == RTMPMinaConnection.class) {
conn = (RTMPMinaConnection) cls.newInstance();
} else if (cls == RTMPTClientConnection.class) {
conn = (RTMPTClientConnection) cls.newInstance();
} else {
conn = (RTMPConnection) cls.newInstance();
}
conn.setMaxHandshakeTimeout(maxHandshakeTimeout);
conn.setMaxInactivity(maxInactivity);
conn.setPingInterval(pingInterval);
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize();
executor.setDaemon(true);
executor.setMaxPoolSize();
executor.setQueueCapacity(executorQueueCapacity);
executor.initialize();
conn.setExecutor(executor);
return conn;
}
這裡就是執行個體化一個RTMPMinaConnection,建立ThreadPoolTaskExecutor并進行相應的設定,最後添加進connMap中。注意每個RTMPMinaConnection的SessionId是随機生成的。
回到sessionCreated中,接下來設定剛剛構造的RTMPMinaConnection,以及其SessionId,然後建立OutboundHandshake用于RTMP協定的握手,握手結束後該OutboundHandshake将會從session中移除,最後設定handler和RTMPMinaConnection。
二. sessionOpened
再來看RTMPMinaIoHandler的sessionOpened函數,代碼如下
public void sessionOpened(IoSession session) throws Exception {
super.sessionOpened(session);
RTMPHandshake handshake = (RTMPHandshake) session.getAttribute(RTMPConnection.RTMP_HANDSHAKE);
IoBuffer clientRequest1 = ((OutboundHandshake) handshake).generateClientRequest1();
session.write(clientRequest1);
}
這裡根據從session中獲得剛剛在sessionCreated中建立的OutboundHandshake,調用其generateClientRequest1函數生成第一次握手請求的資料,通過write函數發送給伺服器。generateClientRequest1函數和具體的協定相關,這裡就不繼續往下看了。
總結一下,本章分析了如何建立一個RTMPClient并建立與伺服器的TCP連接配接,然後發送第一次握手請求開始與伺服器建立RTMP連接配接,下一章開始分析red5伺服器端對應的連接配接函數。