天天看點

red5源碼分析---1red5源碼分析—用戶端連接配接

red5源碼分析—用戶端連接配接

本博文開始分析red5伺服器以及用戶端的源碼,選取的版本為最新的1.0.7。

red5發展到現在,可以相容很多流媒體傳輸協定,例如rtmp、rtmpt等等。本博文隻分析rtmp協定,其他協定看看以後有沒有時間研究吧。

red5的伺服器啟動有好幾種方式,standalone、tomcat、jetty等等。本博文隻分析standalone的啟動方式。本博文假設用戶端為red5 rtmpclient。

red5 client和server的下載下傳位址如下

https://github.com/Red5

首先看一段網上很常見的red5的用戶端代碼,如下所示

public class RtmpClientTest extends RTMPClient implements  
    INetStreamEventHandler, IPendingServiceCallback, IEventDispatcher {  

    private ConcurrentLinkedQueue<IMessage> frameBuffer = new ConcurrentLinkedQueue<IMessage>();
    String host = "127.0.0.1";
    String app = "red5test";  
    int port = ;  

    public RtmpClientTest() {  
        super();  
        Map<String, Object> map = makeDefaultConnectionParams(host,  
            , "red5test");  
        connect(host, , map, this);  
    }  

    @Override  
    public void dispatchEvent(IEvent arg0) {   
    }  

    @Override  
    public void resultReceived(IPendingServiceCall call) { 
        Object result = call.getResult();  
        if (result instanceof ObjectMap) {  
            if ("connect".equals(call.getServiceMethodName())) {  
                createStream(this);  
            }  
        } else {  
            if ("createStream".equals(call.getServiceMethodName())) {  
                if (result instanceof Integer) {  
                    Integer streamIdInt = (Integer) result;  
                    // int streamId = streamIdInt.intValue();  
                    // publish(streamId, "testgio2", "live", this);  
                    invoke("getRoomsInfo", this);  
                } else {  
                    disconnect();  
                }  
            } else if ("getRoomsInfo".equals(call.getServiceMethodName())) {  
                ArrayList<String> list = (ArrayList<String>) result;  
                for (int i = ; i < list.size(); i++) {  
                    System.out.println(list.get(i));  
                }  
            }  
        }  
    }  

    @Override  
    public void onStreamEvent(Notify arg0) {   
        ObjectMap<?, ?> map = (ObjectMap<?, ?>) notify.getCall().getArguments()[];
        String code = (String) map.get("code");
        if (StatusCodes.NS_PUBLISH_START.equals(code)) {
            IMessage message = null;
            while ((message = frameBuffer.poll()) != null) {
                this.publishStreamData(streamId, message);
            }
        } else if (StatusCodes.NS_UNPUBLISHED_SUCCESS.equals(code)) {

        }
    }  

    @Override  
    public void connectionOpened(RTMPConnection conn, RTMP state) {   
        super.connectionOpened(conn, state);  
    }  

    public static void main(String[] args) {  
        new RtmpClientTest();  
    }  
}
           

RtmpClientTest實作的三個接口INetStreamEventHandler、IPendingServiceCallback、IEventDispatcher和回調函數有關,後面的章節會分析到。

首先來看RtmpClientTest的構造函數,其父類RTMPClient的構造函數如下

public RTMPClient() {
        ioHandler = new RTMPMinaIoHandler();
        ioHandler.setHandler(this);
    }
           

red5用戶端使用mina架構來封裝Java Nio,關于mina架構的源碼分析請檢視部落客的mina源碼分析系列文章。這裡有兩個handler,RTMPMinaIoHandler和mina架構有關,另一個handler就是RTMPClient自身,因為其繼承自BaseRTMPClientHandler,BaseRTMPClientHandler和業務有關。

回到RtmpClientTest構造函數,接下來調用connect進行連接配接,connect函數實作在RTMPClient的父類BaseRTMPClientHandler中,

public void connect(String server, int port, Map<String, Object> connectionParams, IPendingServiceCallback connectCallback) {
        connect(server, port, connectionParams, connectCallback, null);
    }

    public void connect(String server, int port, Map<String, Object> connectionParams, IPendingServiceCallback connectCallback, Object[] connectCallArguments) {
        this.connectionParams = connectionParams;
        this.connectArguments = connectCallArguments;
        if (!connectionParams.containsKey("objectEncoding")) {
            connectionParams.put("objectEncoding", );
        }
        this.connectCallback = connectCallback;
        startConnector(server, port);
    }
           

connect函數一開始作了一些簡單的設定,最後通過startConnector與伺服器建立連接配接。startConnector定義在RTMPClient中,

protected void startConnector(String server, int port) {
        socketConnector = new NioSocketConnector();
        socketConnector.setHandler(ioHandler);
        future = socketConnector.connect(new InetSocketAddress(server, port));
        future.addListener(new IoFutureListener<ConnectFuture>() {
            public void operationComplete(ConnectFuture future) {
                try {
                    session = future.getSession();
                } catch (Throwable e) {
                    socketConnector.dispose(false);
                    handleException(e);
                }
            }
        });
        future.awaitUninterruptibly(CONNECTOR_WORKER_TIMEOUT);
    }
           

這裡主要構造了一個NioSocketConnector,并調用其connect函數。connect函數會使用mina架構與伺服器建立連接配接,下一章會分析伺服器如何處理用戶端的連接配接請求。當與伺服器建立完連接配接(TCP連接配接)時,根據mina架構的源碼,會回調mina架構中IoHandler的處理函數,也即前面注冊的RTMPMinaIoHandler的sessionCreated和sessionOpened函數,下面依次來看。

一. sessionCreated

sessionCreated的代碼如下,

public void sessionCreated(IoSession session) throws Exception {
        session.getFilterChain().addFirst("rtmpeFilter", new RTMPEIoFilter());
        RTMPMinaConnection conn = createRTMPMinaConnection();
        conn.setIoSession(session);
        session.setAttribute(RTMPConnection.RTMP_SESSION_ID, conn.getSessionId());
        OutboundHandshake outgoingHandshake = new OutboundHandshake();
        session.setAttribute(RTMPConnection.RTMP_HANDSHAKE, outgoingHandshake);
        if (enableSwfVerification) {
            String swfUrl = (String) handler.getConnectionParams().get("swfUrl");
            if (!StringUtils.isEmpty(swfUrl)) {
                outgoingHandshake.initSwfVerification(swfUrl);
            }
        }
        session.setAttribute(RTMPConnection.RTMP_HANDLER, handler);
        handler.setConnection((RTMPConnection) conn);
    }
           

sessionCreated函數首先向mina架構中添加一個過濾器RTMPEIoFilter,該過濾器用來處理RTMP協定的握手過程,具體的RTMP協定可以從網上下載下傳。sessionCreated接着建立一個RTMPMinaConnection并進行相應的設定,

protected RTMPMinaConnection createRTMPMinaConnection() {
        return (RTMPMinaConnection) RTMPConnManager.getInstance().createConnection(RTMPMinaConnection.class);
    }
           

RTMPConnManager使用單例模式,其createConnection函數如下,

public RTMPConnection createConnection(Class<?> connCls) {
        RTMPConnection conn = null;
        if (RTMPConnection.class.isAssignableFrom(connCls)) {
            try {
                conn = createConnectionInstance(connCls);
                connMap.put(conn.getSessionId(), conn);
            } catch (Exception ex) {

            }
        }
        return conn;
    }

    public RTMPConnection createConnectionInstance(Class<?> cls) throws Exception {
        RTMPConnection conn = null;
        if (cls == RTMPMinaConnection.class) {
            conn = (RTMPMinaConnection) cls.newInstance();
        } else if (cls == RTMPTClientConnection.class) {
            conn = (RTMPTClientConnection) cls.newInstance();
        } else {
            conn = (RTMPConnection) cls.newInstance();
        }
        conn.setMaxHandshakeTimeout(maxHandshakeTimeout);
        conn.setMaxInactivity(maxInactivity);
        conn.setPingInterval(pingInterval);

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize();
        executor.setDaemon(true);
        executor.setMaxPoolSize();
        executor.setQueueCapacity(executorQueueCapacity);
        executor.initialize();
        conn.setExecutor(executor);
        return conn;
    }
           

這裡就是執行個體化一個RTMPMinaConnection,建立ThreadPoolTaskExecutor并進行相應的設定,最後添加進connMap中。注意每個RTMPMinaConnection的SessionId是随機生成的。

回到sessionCreated中,接下來設定剛剛構造的RTMPMinaConnection,以及其SessionId,然後建立OutboundHandshake用于RTMP協定的握手,握手結束後該OutboundHandshake将會從session中移除,最後設定handler和RTMPMinaConnection。

二. sessionOpened

再來看RTMPMinaIoHandler的sessionOpened函數,代碼如下

public void sessionOpened(IoSession session) throws Exception {
        super.sessionOpened(session);
        RTMPHandshake handshake = (RTMPHandshake) session.getAttribute(RTMPConnection.RTMP_HANDSHAKE);
        IoBuffer clientRequest1 = ((OutboundHandshake) handshake).generateClientRequest1();
        session.write(clientRequest1);
    }
           

這裡根據從session中獲得剛剛在sessionCreated中建立的OutboundHandshake,調用其generateClientRequest1函數生成第一次握手請求的資料,通過write函數發送給伺服器。generateClientRequest1函數和具體的協定相關,這裡就不繼續往下看了。

總結一下,本章分析了如何建立一個RTMPClient并建立與伺服器的TCP連接配接,然後發送第一次握手請求開始與伺服器建立RTMP連接配接,下一章開始分析red5伺服器端對應的連接配接函數。

繼續閱讀