說明
本文源碼基于Openfire4.0.2。
Openfire的啟動
Openfire的啟動過程非常的簡單,通過一個入口初始化lib目錄下的openfire.jar包,并啟動一個XMPPServer執行個體。
下面就是ServerStarter.start方法的代碼片斷:
Class containerClass = loader.loadClass("org.jivesoftware.openfire.XMPPServer");
containerClass.newInstance();
這樣一個openfire執行個體就已經啟動了。
XMPPServer類
這個XmppServer類是單執行個體的對象,這樣在伺服器調用時可以擷取一個執行個體。既然是個對象就會有構造的過程,XMPPServer在構造過程中會對服務進行初始化,這個過程包括:
- 初始化配置參數
- 檢查是否需要安裝
- 初始化Module
- 啟動統計子產品
- 啟動plugin
基本就是這麼簡單,還是非常簡潔明了。這裡也可以大概知道在openfire裡主要是module和plugin兩類子產品,一般情況下内部的子產品都用module,對于一些功能的擴充或者第三方的開發擴充使用Plugin。官方其實也會自己寫一個插件來擴充功能,說明插件還是比較靈活的。
提一提Module的加載過程
下面代碼是module的加載過程
if (!setupMode) {
verifyDataSource();
// First load all the modules so that modules may access other modules while
// being initialized
loadModules();
// Initize all the modules
initModules();
// Start all the modules
startModules();
}
可以看到,分了三個步驟:
加載子產品:是對子產品類的執行個體化過程,就是建立對象
初始化子產品:就是調用module.initialize(this);,其實就是調用子產品的初始化方法
啟動子產品:module.start();,同理就是調用啟動子產品
這是因為openfire規範了module的接口抽象Module,所有的子產品都要按照這個規範實作,看代碼:
public interface Module {
/**
* Returns the name of the module for display in administration interfaces.
*
* @return The name of the module.
*/
String getName();
/**
* Initialize the module with the container.
* Modules may be initialized and never started, so modules
* should be prepared for a call to destroy() to follow initialize().
*
* @param server the server hosting this module.
*/
void initialize(XMPPServer server);
/**
* Start the module (must return quickly). Any long running
* operations should spawn a thread and allow the method to return
* immediately.
*/
void start();
/**
* Stop the module. The module should attempt to free up threads
* and prepare for either another call to initialize (reconfigure the module)
* or for destruction.
*/
void stop();
/**
* Module should free all resources and prepare for deallocation.
*/
void destroy();
}
這也标示了Module的生命周期,Openfire會管理這些Module的生命周期,以此來保證各個子產品的啟動與釋放。
Connection管理子產品
整個啟動過程有點奇怪,并沒有看到Openfire是如何監聽端口的,如果不監聽如何獲利用戶端連接配接呢?因為Openfire隻通過Module來管理的,那麼對應的網絡管理應該就在Module中。于是在XMPPServer.loadModules方法中看到下面的代碼:
// Load this module always last since we don't want to start listening for clients
// before the rest of the modules have been started
loadModule(ConnectionManagerImpl.class.getName());
ConnectionManagerImpl就是連接配接的管理子產品,這裡有個注釋,就是在其他子產品啟動後之後再啟動監聽子產品。
在ConnectionManagerImpl中管理了主要的連接配接,都是以ConnectionListener的來管理,這個類用于包裝連接配接。我的了解就是一個連接配接抽象吧,這樣對于代碼來說寫起來比較統一。看下面代碼中Manager管理着哪些:
private final ConnectionListener clientListener;
private final ConnectionListener clientSslListener;
private final ConnectionListener boshListener;
private final ConnectionListener boshSslListener;
private final ConnectionListener serverListener;
private final ConnectionListener componentListener;
private final ConnectionListener componentSslListener;
private final ConnectionListener connectionManagerListener; // Also known as 'multiplexer'
private final ConnectionListener connectionManagerSslListener; // Also known as 'multiplexer'
private final ConnectionListener webAdminListener;
private final ConnectionListener webAdminSslListener;
這裡面除了server隻有一個外,其他的都是兩個,其中一個是SSL的。它們主要是什麼連結?
- client:表示用戶端連接配接
- bosh:就是HTTP綁定的連接配接
- server:伺服器到伺服器的socket連接配接
- component:元件到伺服器的連接配接
- connectionManager:是指通過connectionManager連接配接器過來的連接配接
- webAdmin:是指web控制台的連接配接
這裡面bosh和webAdmin使用的是http協定,是以連接配接并不是長連接配接,其他的都是socket。
openfire裡使用了Mina來實作socket網絡處理。隻不過看代碼中對于S2S類型的連接配接使用的不是mina,如下代碼:
if ( getType() == ConnectionType.SOCKET_S2S )
{
connectionAcceptor = new LegacyConnectionAcceptor( generateConnectionConfiguration() );
}
else
{
connectionAcceptor = new MINAConnectionAcceptor( generateConnectionConfiguration() );
}
LegacyConnectionAcceptor是個廢棄的類,但不知道為什麼s2s還要用這個呢?看了看實作,LegacyConnectionAcceptor就是起了一個線程,線上程裡建了一個ServerSocket。可能以後還是會遷移這部分代碼吧。
在connectionAcceptor中會根據類型建立一個ConnectionHandler用于實作具體的業務功能,而ConnectionHandler都是基于org.apache.mina.core.service.IoHandlerAdapter派生的類,而IoHandlerAdapter又是IoHandler的适配接口,是以實質上就是IoHandler。下面是類繼承關系:
在這些Handler裡完成的主要是每個連接配接打開、關閉和資料收發等操作的處理。而其中比較關鍵的一個步驟就是在sessionOpened中設定了StanzeHandler,而每種ConnectionHandler都有自己的StanzeHandler實作。以ClientConnectionHandler為例子,其中ClientConnectionHandler複寫了父類的createStanzaHandler方法,這裡面
@Override
StanzaHandler createStanzaHandler(NIOConnection connection) {
return new ClientStanzaHandler(XMPPServer.getInstance().getPacketRouter(), connection);
}
這裡使用的是clientStanzaHandler,表示是用戶端的資料節處理者。而最終的createStanzaHandler調用是在父類ConnectionHandler的sessionOpened完成的,
@Override
public void sessionOpened(IoSession session) throws Exception {
// Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter.
final XMLLightweightParser parser = new XMLLightweightParser(StandardCharsets.UTF_8);
session.setAttribute(XML_PARSER, parser);
// Create a new NIOConnection for the new session
final NIOConnection connection = createNIOConnection(session);
session.setAttribute(CONNECTION, connection);
session.setAttribute(HANDLER, createStanzaHandler(connection));
// Set the max time a connection can be idle before closing it. This amount of seconds
// is divided in two, as Openfire will ping idle clients first (at 50% of the max idle time)
// before disconnecting them (at 100% of the max idle time). This prevents Openfire from
// removing connections without warning.
final int idleTime = getMaxIdleTime() / 2;
if (idleTime > 0) {
session.getConfig().setIdleTime(IdleStatus.READER_IDLE, idleTime);
}
}
這樣每一個session在打開時都會設定handler,而具體的handler由各個派生類建立傳回。這裡的StanzHandler就是Openfire裡的資料包處理單元。和connection類型一樣,包處理也是對應的幾個類:
注:
關于openfire與mina的關系可以看看下面的文章,但是版本相對比較老些,代碼有些不同,隻不過思路差不多:
http://blog.csdn.net/huwenfeng_2011/article/details/43413009
Session子產品
對于Openfire來說一個比較重要的功能就是管理session,因為要與用戶端實時的進行資料通訊,是以必須保持着連接配接。在Openfire中對于Session的管理都集中在SessionManager子產品。但在前面說到連接配接管理時已經知道了IoSession的建立過程,但并沒有看到openfire是如何管理它的。接着ConnectionHandler和StanzaHandler就能知道其中有奧秘。
前面知道了ConnectionHandler是連接配接的處理者,這裡會有連接配接的建立、關閉、資料收發的處理,回到ConnectionHandler這個抽象類中。對于建立時(sessionOpend)主要是建立了StanzaHandler,這樣就把資料包的處理委托給了StzanzHandler(派生類)。但是這個時候并沒有将session放入到openfire的session管理子產品中,而是在用戶端發送資料過來後才開始的。
先看看ConnectionHandler的messageReceived方法:
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
// Get the stanza handler for this session
StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);
// Get the parser to use to process stanza. For optimization there is going
// to be a parser for each running thread. Each Filter will be executed
// by the Executor placed as the first Filter. So we can have a parser associated
// to each Thread
final XMPPPacketReader parser = PARSER_CACHE.get();
// Update counter of read btyes
updateReadBytesCounter(session);
//System.out.println("RCVD: " + message);
// Let the stanza handler process the received stanza
try {
handler.process((String) message, parser);
} catch (Exception e) {
Log.error("Closing connection due to error while processing message: " + message, e);
final Connection connection = (Connection) session.getAttribute(CONNECTION);
if ( connection != null ) {
connection.close();
}
}
}
在接收到資料包後擷取到StanzaHandler,然後調用了它的process方法,也就是讓實際的包處理者去處理資料。這樣就回到了StanzeHanler,以ClientStanzaHandler為例子。隻不過這個派生類中沒有重寫process方法,也就是說要看父類的實作:
public void process(String stanza, XMPPPacketReader reader) throws Exception {
boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream");
if (!sessionCreated || initialStream) {
if (!initialStream) {
..........
// Found an stream:stream tag...
if (!sessionCreated) {
sessionCreated = true;
MXParser parser = reader.getXPPParser();
parser.setInput(new StringReader(stanza));
createSession(parser);
}
..........
return;
}
..........
}
由于代碼較多,我省略了一些代碼。看到這應該明白了吧,對于目前的連接配接沒有建立Openfire的session對象時,會進行建立過程createSession,對于不同的StanzeHandler會有些不一樣,這裡ClientStanzaHandler的實作就是把建立好的session放到本地的LocalClientSession中:
@Override
boolean createSession(String namespace, String serverName, XmlPullParser xpp, Connection connection)
throws XmlPullParserException {
if ("jabber:client".equals(namespace)) {
// The connected client is a regular client so create a ClientSession
session = LocalClientSession.createSession(serverName, xpp, connection);
return true;
}
return false;
}
到這一個session算是建立完成了。
叢集下的session
之前一篇關于《Openfire叢集源碼分析》提到了session的一些内容。其中也提到了session是不會向每一台伺服器進行同步複制的,這就有一個問題,如果A使用者先是連接配接了伺服器1,但是接下來的操作又到伺服器2,這不就會造成session無法找到嗎?同樣的問題,如果想要擷取到目前所有的client session怎麼辦?
1、如何在叢集中發消息
對于消息最終還是通過session來發送的,前後代碼太多,就直接看一下sessionManager中的getSession方法吧。
public ClientSession getSession(JID from) {
// Return null if the JID is null or belongs to a foreign server. If the server is
// shutting down then serverName will be null so answer null too in this case.
if (from == null || serverName == null || !serverName.equals(from.getDomain())) {
return null;
}
// Initially Check preAuthenticated Sessions
if (from.getResource() != null) {
ClientSession session = localSessionManager.getPreAuthenticatedSessions().get(from.getResource());
if (session != null) {
return session;
}
}
if (from.getResource() == null || from.getNode() == null) {
return null;
}
return routingTable.getClientRoute(from);
}
先是擷取本地的session,如果能找到直接傳回,找不到則跳到routingTable裡擷取用戶端的路由資訊。
@Override
public ClientSession getClientRoute(JID jid) {
// Check if this session is hosted by this cluster node
ClientSession session = (ClientSession) localRoutingTable.getRoute(jid.toString());
if (session == null) {
// The session is not in this JVM so assume remote
RemoteSessionLocator locator = server.getRemoteSessionLocator();
if (locator != null) {
// Check if the session is hosted by other cluster node
ClientRoute route = usersCache.get(jid.toString());
if (route == null) {
route = anonymousUsersCache.get(jid.toString());
}
if (route != null) {
session = locator.getClientSession(route.getNodeID().toByteArray(), jid);
}
}
}
return session;
}
這裡更直接的可以看到,查找本地路由不null則會通過RemoteSessionLocator來完成。當然這裡最大的奧秘其實是usersCache和anonymousUsersCache這兩個cache。之前寫的叢集源碼分析中提過,最終openfire叢集後會對緩存進行同步,這樣每台伺服器上都會有緩存的副本。是以usersCache是擁有所有使用者資訊的,有了user的資訊就有了jid的資訊,這樣不管是哪台伺服器都可以對資料包處理并發送給用戶端。
這裡的RemoteSessionLocator是由于适配不同的叢集元件所抽象的接口,使得加入不同叢集元件提供了透明處理。
2、如何獲利所有的線上使用者
對于擷取所有線上使用者這個功能思路也挺簡單,一樣是找本地所有的緩存。看getSessions的代碼:
public Collection<ClientSession> getSessions() {
return routingTable.getClientsRoutes(false);
}
其實就是通路路由表,因為路由表裡有所有的cache,和擷取單個的session不一樣,需要對所有的路由都周遊傳回。
@Override
public Collection<ClientSession> getClientsRoutes(boolean onlyLocal) {
// Add sessions hosted by this cluster node
Collection<ClientSession> sessions = new ArrayList<ClientSession>(localRoutingTable.getClientRoutes());
if (!onlyLocal) {
// Add sessions not hosted by this JVM
RemoteSessionLocator locator = server.getRemoteSessionLocator();
if (locator != null) {
// Add sessions of non-anonymous users hosted by other cluster nodes
for (Map.Entry<String, ClientRoute> entry : usersCache.entrySet()) {
ClientRoute route = entry.getValue();
if (!server.getNodeID().equals(route.getNodeID())) {
sessions.add(locator.getClientSession(route.getNodeID().toByteArray(), new JID(entry.getKey())));
}
}
// Add sessions of anonymous users hosted by other cluster nodes
for (Map.Entry<String, ClientRoute> entry : anonymousUsersCache.entrySet()) {
ClientRoute route = entry.getValue();
if (!server.getNodeID().equals(route.getNodeID())) {
sessions.add(locator.getClientSession(route.getNodeID().toByteArray(), new JID(entry.getKey())));
}
}
}
}
return sessions;
}
總結
對于檢視Openfire的源代碼學習了一些東西,特别是對于服務化系統的開發思路。而且在叢集化上也有了一些認識,知道了多機部署後系統應該要解決哪些問題。
繼續學習吧。
注:此文章為原創,歡迎轉載,請在文章頁面明顯位置給出此文連結!
若您覺得這篇文章還不錯請點選下右下角的推薦,非常感謝!
http://www.cnblogs.com/5207
注:此文章為原創,歡迎轉載,請在文章頁面明顯位置給出此文連結!