如果使用者量增加後為了解決吞吐量問題,需要引入叢集,在openfire中提供了叢集的支援,另外也實作了兩個叢集插件:hazelcast和clustering。為了了解情況叢集的工作原理,我就沿着openfire的源代碼進行了分析,也是一次學習的過程。
首先了解叢集的一些簡單概念
叢集的目的是讓多個執行個體像一個執行個體一樣運作,這樣就可以通過增長執行個體來增長計算能力。也就是所謂的分布式計算問題,這其中最為關注的一個特性就是——CAP理論,也就是所謂的一緻性、可用性、分區容錯性。叢集中最核心解決的問題就是CAP。
CAP綜合了解就是我上面寫的,多個執行個體像一個執行個體一樣運作。
是以所謂叢集就是把一些資料共享或者同步到不同的執行個體上,這樣系統使用同樣的算法,取的結果當然應該是相同啦。是以一些資料庫的主從複制,緩存資料叢集都是類似這種解決方法。隻是代碼實作品質和處理規模的問題。
有了這個基礎我們再來看看openfire是怎麼解決這個問題的。
openfire的叢集設計
1、哪些需要進行叢集間的同步
對于openfire而言,有這幾方面的資料需要進行保證叢集間的同步:資料庫存的資料、緩存資料、session。貌似就這些吧?
資料庫
因為對于openfire來說基本上是透明的,是以這塊就交給資料庫本身來實作。
緩存資料
緩存是存在記憶體裡的,是以這部分是要同步的
session
session在openfire并不需要所有執行個體同步,但是需要做使用者路由緩存,否則發消息時找不到對應的會話。由此使用者路由還是要同步的。
2、緩存的設計
- 緩存接口
openfire裡對緩存的資料容器提供了一個包裝接口,這個接口提供了緩存資料的基本方法,用于統一資料操作。
public interface Cache<K,V> extends java.util.Map<K,V>
如果不開啟叢集時緩存的預設緩存容器類是:public class DefaultCache<K, V> ,實際上DefaultCache就是用一個Hashmap來存資料的。
- 緩存工廠類
為了保證緩存是可以擴充的,提供了一個工廠類:
public class CacheFactory
CacheFactory類中會管理所有的緩存容器,如下代碼:
/**
* Returns the named cache, creating it as necessary.
*
* @param name the name of the cache to create.
* @return the named cache, creating it as necessary.
*/
@SuppressWarnings("unchecked")
public static synchronized <T extends Cache> T createCache(String name) {
T cache = (T) caches.get(name);
if (cache != null) {
return cache;
}
cache = (T) cacheFactoryStrategy.createCache(name);
log.info("Created cache [" + cacheFactoryStrategy.getClass().getName() + "] for " + name);
return wrapCache(cache, name);
}
上面代碼中會通過緩存工廠政策對象來建立一個緩存容器,最後warpCache方法會将此容器放入到caches中。
- 緩存工廠類的政策
在CacheFactory中預設是使用一個DefaultLocalCacheStrategy來完成緩存建立的。另外還提供了在叢集條件下的緩存政策接入。也就是通過執行個體化不同的政策來切換緩存管理方案。比如後面要提到的hazelcast就是通過這個來替換了本地緩存政策的。從接口的設計上來看,openfire的緩存政策也就是為了叢集與非叢集的實作。
3、叢集的設計
在openfire中的叢集主要包括:叢集管理、資料同步管理、叢集計算任務。
叢集管理者
在openfire中主要是一個類來實作:ClusterManager,在ClusterManager中實作了叢集執行個體的加入、退出管理,因為沒有使用主從結構,是以ClusterManager實作了一個無中心管理,不知道我了解的對不對。因為隻要目前實執行個體啟用了叢集,ClusterManager就會主動的加載叢集管理并與其他的叢集進行同步。
- startup
startup是啟動叢集的方法,代碼:
public static synchronized void startup() {
if (isClusteringEnabled() && !isClusteringStarted()) {
initEventDispatcher();
CacheFactory.startClustering();
}
}
首先要判斷是否開啟了叢集并且目前叢集執行個體未運作時才去啟動。
先是初始化了事件分發器,用于處理叢集的同步事情。
然後就是調用CacheFactory的startClustering來運作叢集。在startClustering方法中主要是這幾個事情:
- 會使用叢集的緩存工廠政策來啟動,同時使自己加入到叢集中。
- 開啟一個線程用于同步緩存的狀态
在前面startup中的initEventDispatcher方法,在這裡會注冊一個分發線程監聽到叢集事件,收到事件後會執行joinedCluster或者leftCluster的操作,joinedCluster就是加入到叢集中的意思。
在joinedCluster時會将本地的緩存容器都轉換為叢集緩存。由此便完成了叢集的初始化并加入到叢集中了。
- shutdown
shutdown相對簡單點就是退出叢集,并且将緩存工廠恢複為本地緩存。
同步管理
上面主要是講了如何管理叢集,接着比較重要的就是如何在叢集間同步資料呢?這部分主要是看具體的分布式計算系統的實作了,從openfire來說就是将資料放到叢集緩存中,然後通過叢集元件來完成的,比如使用hazelcast。
因為使用緩存來解決,是以在CacheFactory中才會有這些麼多關于叢集的處理代碼,特别是對于緩存政策的切換,以及叢集任務處理都在CacheFactory作為接口方法向外公開。這樣也把叢集的實作透明了。
叢集計算任務
在這之前一直沒有提到叢集中的計算問題,因為既然有了叢集是不是可以利用叢集的優勢進行一些并行計算呢?這部分我倒沒有太過确定,隻是看到相關的代碼是以簡單列一下。
在CacheFactory類中有幾個方法:doClusterTask、doSynchronousClusterTask,這兩個都是overload方法,參數有所不同而已。這幾個方法就是用于執行一些計算任務的。就看一下doClusterTask:
public static void doClusterTask(final ClusterTask<?> task) {
cacheFactoryStrategy.doClusterTask(task);
}
這裡有個限定就是必須是ClusterTask派生的類才行,看看它的定義:
public interface ClusterTask<V> extends Runnable, Externalizable {
V getResult();
}
主要是為了異步執行和序列化,異步是因為不能阻塞,而序列化當然就是為了能在叢集中傳送。
再看CacheFactory的doClusterTask方法可以發現,它隻不過是代理了緩存政策工廠的doClusterTask,具體的實作還是要看叢集實作的。
看一看hazelcast的實作簡單了解openfire叢集
在openfire中有叢集的插件實作,這裡就以hazelcast為例子簡單的做一下分析與學習。
- 緩存政策工廠類(ClusteredCacheFactory)
ClusteredCacheFactory實作了CacheFactoryStrategy,代碼如下:
public class ClusteredCacheFactory implements CacheFactoryStrategy {
首先是startCluster方法用于啟動叢集,主要完成幾件事情:
-
- 設定緩存序列化工具類,ClusterExternalizableUtil。這個是用于叢集間資料複制時的序列化工具
- 設定遠端session定位器,RemoteSessionLocator,因為session不同步,是以它主要是用于多執行個體間的session讀取
- 設定遠端包路由器ClusterPacketRouter,這樣就可以在叢集中發送消息了
- 加載Hazelcast的執行個體設定NodeID,以及設定ClusterListener
在前面說起叢集啟動時提到了緩存切換,那具體實作時是如何做的呢?
因為叢集啟動後就要是CacheFactory.joinedCluster方法來加入叢集的。看一下加入的代碼:
/**
* Notification message indicating that this JVM has joined a cluster.
*/
@SuppressWarnings("unchecked")
public static synchronized void joinedCluster() {
cacheFactoryStrategy = clusteredCacheFactoryStrategy;
// Loop through local caches and switch them to clustered cache (copy content)
for (Cache cache : getAllCaches()) {
// skip local-only caches
if (localOnly.contains(cache.getName())) continue;
CacheWrapper cacheWrapper = ((CacheWrapper) cache);
Cache clusteredCache = cacheFactoryStrategy.createCache(cacheWrapper.getName());
clusteredCache.putAll(cache);
cacheWrapper.setWrappedCache(clusteredCache);
}
clusteringStarting = false;
clusteringStarted = true;
log.info("Clustering started; cache migration complete");
}
這裡可以看到會讀取所有的緩存容器并一個個的使用Wrapper包裝一下,然後用同樣的緩存名稱去createCache一個新的Cache,這步使用的是切換後的叢集緩存政策工廠,也就是說會使用ClusteredCacheFactory去建立新的緩存容器。最後再将cache寫入到新的clusteredCache 裡,這樣就完成了緩存的切換。
當然這裡還是要看一下ClusteredCacheFactory的createCache實作:
public Cache createCache(String name) {
// Check if cluster is being started up
while (state == State.starting) {
// Wait until cluster is fully started (or failed)
try {
Thread.sleep(250);
}
catch (InterruptedException e) {
// Ignore
}
}
if (state == State.stopped) {
throw new IllegalStateException("Cannot create clustered cache when not in a cluster");
}
return new ClusteredCache(name, hazelcast.getMap(name));
}
這裡使用的是ClusteredCache,而且最重要的是傳入的第二個map參數換成了hazelcast的了,這樣之後再通路這個緩存容器時已經不再是原先的本地Cache了,已經是hazelcast的map對象。hazelcast會自動對map的資料進行同步管理,這也就完成了緩存同步的功能。
- 叢集計算
那就看hazelcast的實作吧,在ClusteredCacheFactory中doClusterTask舉個例子吧:
public void doClusterTask(final ClusterTask task) {
if (cluster == null) { return; }
Set<Member> members = new HashSet<Member>();
Member current = cluster.getLocalMember();
for(Member member : cluster.getMembers()) {
if (!member.getUuid().equals(current.getUuid())) {
members.add(member);
}
}
if (members.size() > 0) {
// Asynchronously execute the task on the other cluster members
logger.debug("Executing asynchronous MultiTask: " + task.getClass().getName());
hazelcast.getExecutorService(HAZELCAST_EXECUTOR_SERVICE_NAME).submitToMembers(
new CallableTask<Object>(task), members);
} else {
logger.warn("No cluster members selected for cluster task " + task.getClass().getName());
}
}
過程就是,先擷取到叢集中的執行個體成員,當然要排除自己。然後hazelcast提供了ExecutorService來執行這個task,方法就是submiteToMembers。這樣就送出了一個運算任務。隻不過具體是如何配置設定計算并彙集結果倒真不太清楚。
總結
花了一天時間看了一下openfire的叢集,順手就寫了一篇文章,确實也到了一些東西。和一些網友溝通中好像目前大家更願意使用redies來完成緩存共享,以及通過代理來實作叢集,而不願意使用openfire的叢集方案。這部分我沒有遇到如何大的并發量需求确實不知道差別在哪裡。以後有機會還是動手試試寫一個redies的插件。
原創聲明:本文為原創内容,轉載請注明。http://www.cnblogs.com/5207/p/5705092.html
注:此文章為原創,歡迎轉載,請在文章頁面明顯位置給出此文連結!
若您覺得這篇文章還不錯請點選下右下角的推薦,非常感謝!