天天看點

Openfire的啟動過程與session管理

說明

本文源碼基于Openfire4.0.2。

Openfire的啟動

    Openfire的啟動過程非常的簡單,通過一個入口初始化lib目錄下的openfire.jar包,并啟動一個XMPPServer執行個體。

下面就是ServerStarter.start方法的代碼片斷:

Class containerClass = loader.loadClass("org.jivesoftware.openfire.XMPPServer");
containerClass.newInstance();      

這樣一個openfire執行個體就已經啟動了。

XMPPServer類

這個XmppServer類是單執行個體的對象,這樣在伺服器調用時可以擷取一個執行個體。既然是個對象就會有構造的過程,XMPPServer在構造過程中會對服務進行初始化,這個過程包括:

  • 初始化配置參數
  • 檢查是否需要安裝
  • 初始化Module
  • 啟動統計子產品
  • 啟動plugin

基本就是這麼簡單,還是非常簡潔明了。這裡也可以大概知道在openfire裡主要是module和plugin兩類子產品,一般情況下内部的子產品都用module,對于一些功能的擴充或者第三方的開發擴充使用Plugin。官方其實也會自己寫一個插件來擴充功能,說明插件還是比較靈活的。

提一提Module的加載過程

下面代碼是module的加載過程

if (!setupMode) {
    verifyDataSource();
    // First load all the modules so that modules may access other modules while
    // being initialized
    loadModules();
    // Initize all the modules
    initModules();
    // Start all the modules
    startModules();
}      

可以看到,分了三個步驟:

加載子產品:是對子產品類的執行個體化過程,就是建立對象

初始化子產品:就是調用module.initialize(this);,其實就是調用子產品的初始化方法

啟動子產品:module.start();,同理就是調用啟動子產品

這是因為openfire規範了module的接口抽象Module,所有的子產品都要按照這個規範實作,看代碼:

public interface Module {
 
    /**
     * Returns the name of the module for display in administration interfaces.
     *
     * @return The name of the module.
     */
    String getName();
 
    /**
     * Initialize the module with the container.
     * Modules may be initialized and never started, so modules
     * should be prepared for a call to destroy() to follow initialize().
     *
     * @param server the server hosting this module.
     */
    void initialize(XMPPServer server);
 
    /**
     * Start the module (must return quickly). Any long running
     * operations should spawn a thread and allow the method to return
     * immediately.
     */
    void start();
 
    /**
     * Stop the module. The module should attempt to free up threads
     * and prepare for either another call to initialize (reconfigure the module)
     * or for destruction.
     */
    void stop();
 
    /**
     * Module should free all resources and prepare for deallocation.
     */
    void destroy();
}      

這也标示了Module的生命周期,Openfire會管理這些Module的生命周期,以此來保證各個子產品的啟動與釋放。

Connection管理子產品

整個啟動過程有點奇怪,并沒有看到Openfire是如何監聽端口的,如果不監聽如何獲利用戶端連接配接呢?因為Openfire隻通過Module來管理的,那麼對應的網絡管理應該就在Module中。于是在XMPPServer.loadModules方法中看到下面的代碼:

// Load this module always last since we don't want to start listening for clients
// before the rest of the modules have been started
loadModule(ConnectionManagerImpl.class.getName());      

ConnectionManagerImpl就是連接配接的管理子產品,這裡有個注釋,就是在其他子產品啟動後之後再啟動監聽子產品。

在ConnectionManagerImpl中管理了主要的連接配接,都是以ConnectionListener的來管理,這個類用于包裝連接配接。我的了解就是一個連接配接抽象吧,這樣對于代碼來說寫起來比較統一。看下面代碼中Manager管理着哪些:

private final ConnectionListener clientListener;
    private final ConnectionListener clientSslListener;
    private final ConnectionListener boshListener;
    private final ConnectionListener boshSslListener;
    private final ConnectionListener serverListener;
    private final ConnectionListener componentListener;
    private final ConnectionListener componentSslListener;
    private final ConnectionListener connectionManagerListener; // Also known as 'multiplexer'
    private final ConnectionListener connectionManagerSslListener; // Also known as 'multiplexer'
    private final ConnectionListener webAdminListener;
    private final ConnectionListener webAdminSslListener;      

這裡面除了server隻有一個外,其他的都是兩個,其中一個是SSL的。它們主要是什麼連結?

  • client:表示用戶端連接配接
  • bosh:就是HTTP綁定的連接配接
  • server:伺服器到伺服器的socket連接配接
  • component:元件到伺服器的連接配接
  • connectionManager:是指通過connectionManager連接配接器過來的連接配接
  • webAdmin:是指web控制台的連接配接

這裡面bosh和webAdmin使用的是http協定,是以連接配接并不是長連接配接,其他的都是socket。

openfire裡使用了Mina來實作socket網絡處理。隻不過看代碼中對于S2S類型的連接配接使用的不是mina,如下代碼:

if ( getType() == ConnectionType.SOCKET_S2S )
{
    connectionAcceptor = new LegacyConnectionAcceptor( generateConnectionConfiguration() );
}
else
{
    connectionAcceptor = new MINAConnectionAcceptor( generateConnectionConfiguration() );
}      

LegacyConnectionAcceptor是個廢棄的類,但不知道為什麼s2s還要用這個呢?看了看實作,LegacyConnectionAcceptor就是起了一個線程,線上程裡建了一個ServerSocket。可能以後還是會遷移這部分代碼吧。

在connectionAcceptor中會根據類型建立一個ConnectionHandler用于實作具體的業務功能,而ConnectionHandler都是基于org.apache.mina.core.service.IoHandlerAdapter派生的類,而IoHandlerAdapter又是IoHandler的适配接口,是以實質上就是IoHandler。下面是類繼承關系:

Openfire的啟動過程與session管理

在這些Handler裡完成的主要是每個連接配接打開、關閉和資料收發等操作的處理。而其中比較關鍵的一個步驟就是在sessionOpened中設定了StanzeHandler,而每種ConnectionHandler都有自己的StanzeHandler實作。以ClientConnectionHandler為例子,其中ClientConnectionHandler複寫了父類的createStanzaHandler方法,這裡面

@Override
    StanzaHandler createStanzaHandler(NIOConnection connection) {
        return new ClientStanzaHandler(XMPPServer.getInstance().getPacketRouter(), connection);
    }      

這裡使用的是clientStanzaHandler,表示是用戶端的資料節處理者。而最終的createStanzaHandler調用是在父類ConnectionHandler的sessionOpened完成的,

@Override
public void sessionOpened(IoSession session) throws Exception {
    // Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter.
    final XMLLightweightParser parser = new XMLLightweightParser(StandardCharsets.UTF_8);
    session.setAttribute(XML_PARSER, parser);
    // Create a new NIOConnection for the new session
    final NIOConnection connection = createNIOConnection(session);
    session.setAttribute(CONNECTION, connection);
    session.setAttribute(HANDLER, createStanzaHandler(connection));
    // Set the max time a connection can be idle before closing it. This amount of seconds
    // is divided in two, as Openfire will ping idle clients first (at 50% of the max idle time)
    // before disconnecting them (at 100% of the max idle time). This prevents Openfire from
    // removing connections without warning.
    final int idleTime = getMaxIdleTime() / 2;
    if (idleTime > 0) {
        session.getConfig().setIdleTime(IdleStatus.READER_IDLE, idleTime);
    }
}      

這樣每一個session在打開時都會設定handler,而具體的handler由各個派生類建立傳回。這裡的StanzHandler就是Openfire裡的資料包處理單元。和connection類型一樣,包處理也是對應的幾個類:

Openfire的啟動過程與session管理

注:

關于openfire與mina的關系可以看看下面的文章,但是版本相對比較老些,代碼有些不同,隻不過思路差不多:

http://blog.csdn.net/huwenfeng_2011/article/details/43413009

Session子產品

對于Openfire來說一個比較重要的功能就是管理session,因為要與用戶端實時的進行資料通訊,是以必須保持着連接配接。在Openfire中對于Session的管理都集中在SessionManager子產品。但在前面說到連接配接管理時已經知道了IoSession的建立過程,但并沒有看到openfire是如何管理它的。接着ConnectionHandler和StanzaHandler就能知道其中有奧秘。

前面知道了ConnectionHandler是連接配接的處理者,這裡會有連接配接的建立、關閉、資料收發的處理,回到ConnectionHandler這個抽象類中。對于建立時(sessionOpend)主要是建立了StanzaHandler,這樣就把資料包的處理委托給了StzanzHandler(派生類)。但是這個時候并沒有将session放入到openfire的session管理子產品中,而是在用戶端發送資料過來後才開始的。

先看看ConnectionHandler的messageReceived方法:

@Override
public void messageReceived(IoSession session, Object message) throws Exception {
    // Get the stanza handler for this session
    StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);
    // Get the parser to use to process stanza. For optimization there is going
    // to be a parser for each running thread. Each Filter will be executed
    // by the Executor placed as the first Filter. So we can have a parser associated
    // to each Thread
    final XMPPPacketReader parser = PARSER_CACHE.get();
    // Update counter of read btyes
    updateReadBytesCounter(session);
    //System.out.println("RCVD: " + message);
    // Let the stanza handler process the received stanza
    try {
        handler.process((String) message, parser);
    } catch (Exception e) {
        Log.error("Closing connection due to error while processing message: " + message, e);
        final Connection connection = (Connection) session.getAttribute(CONNECTION);
        if ( connection != null ) {
            connection.close();
        }
 
    }
}      

在接收到資料包後擷取到StanzaHandler,然後調用了它的process方法,也就是讓實際的包處理者去處理資料。這樣就回到了StanzeHanler,以ClientStanzaHandler為例子。隻不過這個派生類中沒有重寫process方法,也就是說要看父類的實作:

public void process(String stanza, XMPPPacketReader reader) throws Exception {
 
    boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream");
    if (!sessionCreated || initialStream) {
        if (!initialStream) {
..........
        // Found an stream:stream tag...
        if (!sessionCreated) {
            sessionCreated = true;
            MXParser parser = reader.getXPPParser();
            parser.setInput(new StringReader(stanza));
            createSession(parser);
        }
..........
        return;
    }
..........
}      

由于代碼較多,我省略了一些代碼。看到這應該明白了吧,對于目前的連接配接沒有建立Openfire的session對象時,會進行建立過程createSession,對于不同的StanzeHandler會有些不一樣,這裡ClientStanzaHandler的實作就是把建立好的session放到本地的LocalClientSession中:

@Override
boolean createSession(String namespace, String serverName, XmlPullParser xpp, Connection connection)
        throws XmlPullParserException {
    if ("jabber:client".equals(namespace)) {
        // The connected client is a regular client so create a ClientSession
        session = LocalClientSession.createSession(serverName, xpp, connection);
        return true;
    }
    return false;
}      

到這一個session算是建立完成了。

叢集下的session

之前一篇關于《Openfire叢集源碼分析》提到了session的一些内容。其中也提到了session是不會向每一台伺服器進行同步複制的,這就有一個問題,如果A使用者先是連接配接了伺服器1,但是接下來的操作又到伺服器2,這不就會造成session無法找到嗎?同樣的問題,如果想要擷取到目前所有的client session怎麼辦?

1、如何在叢集中發消息

對于消息最終還是通過session來發送的,前後代碼太多,就直接看一下sessionManager中的getSession方法吧。

public ClientSession getSession(JID from) {
    // Return null if the JID is null or belongs to a foreign server. If the server is
    // shutting down then serverName will be null so answer null too in this case.
    if (from == null || serverName == null || !serverName.equals(from.getDomain())) {
        return null;
    }
 
    // Initially Check preAuthenticated Sessions
    if (from.getResource() != null) {
        ClientSession session = localSessionManager.getPreAuthenticatedSessions().get(from.getResource());
        if (session != null) {
            return session;
        }
    }
 
    if (from.getResource() == null || from.getNode() == null) {
        return null;
    }
 
    return routingTable.getClientRoute(from);
}      

先是擷取本地的session,如果能找到直接傳回,找不到則跳到routingTable裡擷取用戶端的路由資訊。

@Override
public ClientSession getClientRoute(JID jid) {
    // Check if this session is hosted by this cluster node
    ClientSession session = (ClientSession) localRoutingTable.getRoute(jid.toString());
    if (session == null) {
        // The session is not in this JVM so assume remote
        RemoteSessionLocator locator = server.getRemoteSessionLocator();
        if (locator != null) {
            // Check if the session is hosted by other cluster node
            ClientRoute route = usersCache.get(jid.toString());
            if (route == null) {
                route = anonymousUsersCache.get(jid.toString());
            }
            if (route != null) {
                session = locator.getClientSession(route.getNodeID().toByteArray(), jid);
            }
        }
    }
    return session;
}      

這裡更直接的可以看到,查找本地路由不null則會通過RemoteSessionLocator來完成。當然這裡最大的奧秘其實是usersCache和anonymousUsersCache這兩個cache。之前寫的叢集源碼分析中提過,最終openfire叢集後會對緩存進行同步,這樣每台伺服器上都會有緩存的副本。是以usersCache是擁有所有使用者資訊的,有了user的資訊就有了jid的資訊,這樣不管是哪台伺服器都可以對資料包處理并發送給用戶端。

這裡的RemoteSessionLocator是由于适配不同的叢集元件所抽象的接口,使得加入不同叢集元件提供了透明處理。

2、如何獲利所有的線上使用者

對于擷取所有線上使用者這個功能思路也挺簡單,一樣是找本地所有的緩存。看getSessions的代碼:

public Collection<ClientSession> getSessions() {
        return routingTable.getClientsRoutes(false);
    }      

其實就是通路路由表,因為路由表裡有所有的cache,和擷取單個的session不一樣,需要對所有的路由都周遊傳回。

@Override
public Collection<ClientSession> getClientsRoutes(boolean onlyLocal) {
    // Add sessions hosted by this cluster node
    Collection<ClientSession> sessions = new ArrayList<ClientSession>(localRoutingTable.getClientRoutes());
    if (!onlyLocal) {
        // Add sessions not hosted by this JVM
        RemoteSessionLocator locator = server.getRemoteSessionLocator();
        if (locator != null) {
            // Add sessions of non-anonymous users hosted by other cluster nodes
            for (Map.Entry<String, ClientRoute> entry : usersCache.entrySet()) {
                ClientRoute route = entry.getValue();
                if (!server.getNodeID().equals(route.getNodeID())) {
                    sessions.add(locator.getClientSession(route.getNodeID().toByteArray(), new JID(entry.getKey())));
                }
            }
            // Add sessions of anonymous users hosted by other cluster nodes
            for (Map.Entry<String, ClientRoute> entry : anonymousUsersCache.entrySet()) {
                ClientRoute route = entry.getValue();
                if (!server.getNodeID().equals(route.getNodeID())) {
                    sessions.add(locator.getClientSession(route.getNodeID().toByteArray(), new JID(entry.getKey())));
                }
            }
        }
    }
    return sessions;
}      

總結

對于檢視Openfire的源代碼學習了一些東西,特别是對于服務化系統的開發思路。而且在叢集化上也有了一些認識,知道了多機部署後系統應該要解決哪些問題。

繼續學習吧。

注:此文章為原創,歡迎轉載,請在文章頁面明顯位置給出此文連結!

若您覺得這篇文章還不錯請點選下右下角的推薦,非常感謝!

http://www.cnblogs.com/5207

注:此文章為原創,歡迎轉載,請在文章頁面明顯位置給出此文連結!

繼續閱讀