去年項目需要看了hazelcast源碼,當時記錄的筆記。
Node是節點的抽象,裡面包含節點引擎、用戶端引擎、分區服務、叢集服務、多點傳播服務、連接配接管理、指令管理、多點傳播屬性、節點配置、本地成員、tcp位址、多點傳播位址、連接配接者、節點初始化器、管理中心、安全上下文、
Config類,包含GroupConfig、NetworkConfig、MapConfig、TopicConfig、QueueConfig、MultiMapConfig、ListConfig、SetConfig、ExecutorConfig、SemaphoreConfig、WanReplicationConfig、ServicesConfig、SecurityConfig、ListenerConfig、PartitionGroupConfig、ManagementCenterConfig、SerializationConfig。
GroupConfig叢集使用者名及密碼DEFAULT_GROUP_PASSWORD = "dev-pass";DEFAULT_GROUP_NAME = "dev";
NetworkConfig網絡相關配置,包括InterfacesConfig、JoinConfig、SSLConfig等,其中JoinConfig包括MulticastConfig、TcpIpConfig、AwsConfig。
SecurityConfig用戶端登陸相關配置。
WanReplicationConfig叢集複制配置,包括WanTargetClusterConfig配置,所有目标節點相關配置。
ClusterServiceImpl用于維護叢集各個成員節點,執行個體化時把本地節點加入成員Map中,MulticastService
Node執行個體化時根據MulticastConfig使用多點傳播加入一個組,MulticastService提供節點基于多點傳播傳遞的,使用監聽模式每當接收到其他節點傳播的消息調用監聽器的onMessage,傳遞的參數為JoinMessage,它由序列化子產品提供轉化,預設NodeMulticastListener監聽器,會對叢集的節點校驗,是否是同個叢集使用者名密碼。
MulticastJoiner用于加入叢集節點,建立JoinRequest對象用MulticastService發送給其他成員,不斷發送加入叢集請求直到node裡面的masteraddress,如果配置裡面指定了targetAddress就不用使用這種不斷發送的方式選舉主節點。MulticastService負責發送申請加入的多點傳播消息群組播消息接收及處理的工作。
找主節點方式:循環發送JoinRequest消息向組内發送,如果已加入狀态且是master的節點接收到後會向外多點傳播JoinMessage,告訴其他節點我的組成員資訊,還沒加入叢集的成員則将自己節點的master位址設定為master發出的JoinMessage中的位址。是以JoinMessage隻有master才會發出。其他已加入組的成員節點接收到JoinMessage類型消息則直接忽略,
JoinMessage包含包版本、位址等資訊。
SerializationService序列化轉化子產品,SerializationServiceBuilder生産者,預設位元組存放順序序列為ByteOrder.BIG_ENDIAN,即使用ByteArrayInputOutputFactory,
Packet為資料包結構,第一位元組為版本号,第二位元組表示為長度,第三位元組表示類型。
SocketConnector會執行連接配接操作,使用的是阻塞讀寫。
TcpIpConnectionManager內建ConnectionManager,用于管理TCPIP連接配接
JoinMessage{
protected byte packetVersion;
protected int buildNumber;
protected Address address;
protected String uuid;
protected ConfigCheck configCheck;
protected int memberCount;
}
JoinRequest繼承JoinMessage,并添加credentials、tryCount屬性。
預設5701作為每個節點對外暴露的端口。
ReadHandler的handle方法會初始化socketReader,叢集成員維護的話使用SocketPacketReader讀取封包,
OperationServiceImpl類内部類RemoteOperationProcessor用于ResponseOperation對象
response.beforeRun();
response.run();
response.afterRun();
即會調用JoinRequestOperation的run方法,調用ClusterServiceImpl裡面的handleJoinRequest方法,完成成員更新工作,接收成功會向源節點傳回SetMasterOperation、MemberInfoUpdateOperation等等更新源節點成員及節點joined狀态。
裡面一切需要處理的都用run方法,會統一處理。
Node.start--TcpIpConnectionManager.start--InSelectorImpl.run(輸入即讀取選擇器,run不斷選擇可讀的對象并擷取attachment,調用它的handle方法,這裡的attachment是在TcpIpConnection執行個體化的時候建立readHandle對象并調用start方法把readHandler當做attachment注冊到socketChannel裡面的,readHandler調用handle方法,如果是叢集協定CLUSTER則執行個體化SocketPacketReader,SocketPacketReader包含一個PacketReader,預設是DefaultPacketReader,用于将套接字讀出來的位元組轉化成Packet包并調用handlePacket方法處理消息包,如果是叢集成員消息包則調用handleMemberPacket方法處理,間接調用NodeEngineImpl的handlePacket方法,有三種不同頭部類型:Packet.HEADER_OP、Packet.HEADER_EVENT、Packet.HEADER_WAN_REPLICATION,HEADER_OP表示操作類型,會調用operationService的handleOperation方法處理包消息,OperationThread會一直跑調用OperationRunner處理需要執行的Operation,OperationRunner線程數由hazelcast.operation.thread.count設定,如果-1則為機器cpu個數,接着會執行operation的beforeRun、run、operation如果設定了響應的話還要執行産生響應内容在傳回給調用者、afterRun方法。)--OutSelectorImpl.run(輸出即寫入選擇器)--SocketAcceptor.run--TcpIpConnection.start(TcpIpConnection包括WriteHandler和ReadHandler)--
private void process(Object task) {
processedCount++;
if (task instanceof Operation) {
processOperation((Operation) task);
return;
}
if (task instanceof Packet) {
processPacket((Packet) task);
if (task instanceof PartitionSpecificRunnable) {
processPartitionSpecificRunnable((PartitionSpecificRunnable) task);
throw new IllegalStateException("Unhandled task type for task:" + task);
}
設計得不錯的是它直接支援各種Operation的傳遞,并執行,本質也是序列化反序列化,然後調用Operation的beforerun、run、afterrun方法,後面還會自動執行handleResponse方法,此方法用于向其他備份節點同步資料,這部分操作是在operation的afterrun之前完成備份。備份工作由OperationBackupHandler完成,backup方法,備份又分為同步備份和異步備份,相加等于總的需要備份數。map操作預設備份一份資料且是同步的,異步的預設為0.異步備份不會阻塞操作。備份的operation是Backup,它的run會執行PutOperation的run方法,即把資料放到緩存中并修改版本,這裡的run不會再執行複制操作。
<hazelcast>
<map name="default">
<backup-count>1</backup-count>
<async-backup-count>1</async-backup-count>
</map>
</hazelcast>
operation的run同樣會在備份節點上執行,putoperation其實就是在本地緩存更新值。備份的過程沒有一個ack機制,資訊傳輸的可靠性如何保證?
一個節點調用map的put操作時,會在本節點上緩存這個結果,再把operation傳輸到對應partition的第一個備份節點(這個節點可能就是自己本地節點)上,第一個節點接收到後備份到第二個節點上,是以預設就隻有兩個備份資料。是以nearcache緩存是可能存在每個節點上的。
PutOperation的afterRun方法主要是觸發一些攔截器,觸發各個節點的事件監聽器什麼的、更新各個備份節點緩存等等。
IOBalancer平衡io讀取寫入。
packet封包結構,1byte版本+2byte頭部+4byte分區+4byte長度+nbyte消息。
主節點一個一個發給其他成員節點關于成員的消息,其他節點進行更新。
ServiceManagerImpl用于管理所有的遠端,啟動時會注冊常用的一些服務,包括如下的服務,什麼map、queue什麼的
private void registerCoreServices() {
Node node = nodeEngine.getNode();
registerService(ClusterServiceImpl.SERVICE_NAME, node.getClusterService());
registerService(InternalPartitionService.SERVICE_NAME, node.getPartitionService());
registerService(ProxyServiceImpl.SERVICE_NAME, nodeEngine.getProxyService());
registerService(TransactionManagerServiceImpl.SERVICE_NAME, nodeEngine.getTransactionManagerService());
registerService(ClientEngineImpl.SERVICE_NAME, node.clientEngine);
registerService(QuorumServiceImpl.SERVICE_NAME, nodeEngine.getQuorumService());
private void registerDefaultServices(ServicesConfig servicesConfig) {
registerService(MapService.SERVICE_NAME, createService(MapService.class));
registerService(LockService.SERVICE_NAME, new LockServiceImpl(nodeEngine));
registerService(QueueService.SERVICE_NAME, new QueueService(nodeEngine));
registerService(TopicService.SERVICE_NAME, new TopicService());
registerService(ReliableTopicService.SERVICE_NAME, new ReliableTopicService(nodeEngine));
registerService(MultiMapService.SERVICE_NAME, new MultiMapService(nodeEngine));
registerService(ListService.SERVICE_NAME, new ListService(nodeEngine));
registerService(SetService.SERVICE_NAME, new SetService(nodeEngine));
registerService(DistributedExecutorService.SERVICE_NAME, new DistributedExecutorService());
registerService(AtomicLongService.SERVICE_NAME, new AtomicLongService());
registerService(AtomicReferenceService.SERVICE_NAME, new AtomicReferenceService());
registerService(CountDownLatchService.SERVICE_NAME, new CountDownLatchService());
registerService(SemaphoreService.SERVICE_NAME, new SemaphoreService(nodeEngine));
registerService(IdGeneratorService.SERVICE_NAME, new IdGeneratorService(nodeEngine));
registerService(MapReduceService.SERVICE_NAME, new MapReduceService(nodeEngine));
registerService(ReplicatedMapService.SERVICE_NAME, new ReplicatedMapService(nodeEngine));
registerService(RingbufferService.SERVICE_NAME, new RingbufferService(nodeEngine));
registerService(XAService.SERVICE_NAME, new XAService(nodeEngine));
registerCacheServiceIfAvailable();
Map通過AbstractMapServiceFactory建立,使用MapRemoteService處理遠端操作,RemoteService服務有兩個方法createDistributedObject和destroyDistributedObject方法,最終是通過MapProxyImpl。Map的partition政策在MapContainer裡面。
PutOperation用于執行叢集節點的put操作。一緻性哈希根據key使用MurmurHash哈希計算出結果,再根據分區數(預設271)取餘。每個節點都是使用new ConcurrentHashMap<Data, Record>(1000, 0.75f, 1)存放記錄。
成員組MemberGroup一共有若幹個組,多少個成員就多少個組,最大的複制節點數為7,如果成員組小于7則使用成員組數量。
address[271][4]
Address[271][7]
1、addr0 addr9 addr3 addr4 null null null
2、.
.
271、addr1 addr2 addr3 addr4 null null null
對配置設定的結果嘗試重新配置設定,把過載的組配置設定一些成員給不足的組,并檢測每個成員組内的節點數相差不會超過系數1.1,否則重新分組,盡可能達到均勻。
最終形成一張表,271個分區每個分區都對應着若幹個複制的成員位址。
int avgPartitionPerGroup = partitionCount / groupSize;
就是說最多4個組,假如5個結點,分組情況為2,1,1,1,則每個組分到的partition個數為271/4,可能有些組多1個partition。
一共有271條線程處理operation,OperationThread,裡面有隊列ScheduleQueue,線程會不斷處理,ScheduleQueue用于operation執行緩沖隊列,裡面的有兩種隊列,normalQueue和priorityQueue,一種用于正常的排隊,一種用于設定優先級的隊列,take方法會優先從priorityQueue中擷取需要優先處理的operation。
有個同步複制、異步複制。
OperationServiceImpl.createInvocationBuilder(此方法有兩個,一個用于partition、一個用于指定的address),-> 執行個體化一個InvocationBuilderImpl對象,調用invoke方法會建立Invocation,Invocation包含PartitionInvocation(針對分區)和TargetInvocation(針對指定節點)兩種。
在叢集中傳輸一切以Data形式傳輸。
Map的服務名為hz:impl:mapService。
假如HazelcastInstance執行instance.getMap("customers")則,通過HazelcastInstanceProxy的getMap方法,代理是調HazelcastInstanceImpl的getMap方法,調用getDistributedObject方法,它會通過ProxyService(它代理了所有服務)代理找到MapService,調用mapservice的createDistributedObject方法建立DistributedObject,間接調用MapRemoteService的createDistributedObject方法建立MapProxyImpl,調用MapProxyImpl的put方法把資料放到叢集,主要如下操作
final Data key = toData(k, partitionStrategy);
final Data value = toData(v);
final Data result = putInternal(key, value, ttl, timeunit);
return (V) toObject(result);
分别将key和value轉化為Data,其實是序列化,友善網絡傳輸。putInternal方法使用PutOperation,并且要根據key計算出partitionId,接着再完成operation的調用。
SerializationServiceImpl提供各種類型的序列化支援,toData提供由object到Data的轉化,toObject提供由Data到object的轉化,Data預設是使用DefaultData,DefaultData裡面其實就是包含了一個位元組數組還有不同的偏移量,例如從幾位到幾位表示類型,序列化工作其實也跟這種類似,把某一對象的相關資訊轉化為位元組數組,傳遞到目的地後再根據約定反向組裝成指定對象。
OperationThread專門用于執行接收到的請求,process方法,可以有三種請求,包括Operation、Packet、PartitionSpecificRunnable,分别不同的處理邏輯,Operation則直接反序列化後調用beforeRun、run、afterRun等方法。
HealthMonitor是獨立一條線程用于監控健康,包括
private class HealthMetrics {
private final long memoryFree;
private final long memoryTotal;
private final long memoryUsed;
private final long memoryMax;
private final double memoryUsedOfTotalPercentage;
private final double memoryUsedOfMaxPercentage;
//following three load variables are always between 0 and 100.
private final double processCpuLoad;
private final double systemLoadAverage;
private final double systemCpuLoad;
private final int threadCount;
private final int peakThreadCount;
private final long clusterTimeDiff;
private final int asyncExecutorQueueSize;
private final int clientExecutorQueueSize;
private final int queryExecutorQueueSize;
private final int scheduledExecutorQueueSize;
private final int systemExecutorQueueSize;
private final int eventQueueSize;
private final int pendingInvocationsCount;
private final double pendingInvocationsPercentage;
private final int operationServiceOperationExecutorQueueSize;
private final int operationServiceOperationPriorityExecutorQueueSize;
private final int operationServiceOperationResponseQueueSize;
private final int runningOperationsCount;
private final int remoteOperationsCount;
private final int proxyCount;
private final int clientEndpointCount;
private final int activeConnectionCount;
private final int currentClientConnectionCount;
private final int connectionCount;
private final int ioExecutorQueueSize;
PerformanceMonitor表示性能監控,監控的參數包括inSelector的已讀取數量,outSelect的已寫入事件數量,OperationService相關的性能參數,例如挂起的調用比例、總體調用比例、最大調用數量、271個分區線程已執行數量、271個分區operation線程正在挂起線程(即任務排隊隊列中的數量),正常operation線程的排隊隊列數量、正常operation線程已執行數量、響應線程已執行數量、響應線程排隊隊列數量。
hazelcast核心——Node,包含各種各樣重要的基礎服務,日志、節點關閉鈎子、序列化服務、節點引擎、用戶端引擎、分區服務、叢集服務、廣播服務、連接配接管理服務、指令服務、配置檔案服務、群組屬性服務、本節點位址、本地叢集成員對象、主節點位址、hazelcast執行個體引用、日志服務、叢集節點加入服務、節點擴充服務、管理中心服務、安全上下文、建立資訊服務、版本校對服務、hazelcast線程組。
public class Node {
private final ILogger logger;
private final NodeShutdownHookThread shutdownHookThread = new NodeShutdownHookThread("hz.ShutdownThread");
private final SerializationService serializationService;
public final NodeEngineImpl nodeEngine;
public final ClientEngineImpl clientEngine;
public final InternalPartitionService partitionService;
public final ClusterServiceImpl clusterService;
public final MulticastService multicastService;
public final ConnectionManager connectionManager;
public final TextCommandServiceImpl textCommandService;
public final Config config;
public final GroupProperties groupProperties;
public final Address address;
public final MemberImpl localMember;
private volatile Address masterAddress = null;
public final HazelcastInstanceImpl hazelcastInstance;
public final LoggingServiceImpl loggingService;
private final Joiner joiner;
private final NodeExtension nodeExtension;
private ManagementCenterService managementCenterService;
public final SecurityContext securityContext;
private final ClassLoader configClassLoader;
private final BuildInfo buildInfo;
private final VersionCheck versionCheck = new VersionCheck();
private final HazelcastThreadGroup hazelcastThreadGroup;
NodeEngineImpl作為節點引擎包含了許多服務,重要的例如,事件服務、operation服務、執行服務、等待通知服務、service(内置許多service,例如Map、Queue,使用者自定義的服務可配置到hazelcast.xml,啟動時會加載進來)管理服務、事務管理服務、代理服務、wan複制服務、包傳輸服務、證明人服務。
public class NodeEngineImpl implements NodeEngine {
private final Node node;
private final EventServiceImpl eventService;
private final OperationServiceImpl operationService;
private final ExecutionServiceImpl executionService;
private final WaitNotifyServiceImpl waitNotifyService;
private final ServiceManagerImpl serviceManager;
private final TransactionManagerServiceImpl transactionManagerService;
private final ProxyServiceImpl proxyService;
private final WanReplicationService wanReplicationService;
private final PacketTransceiver packetTransceiver;
private final QuorumServiceImpl quorumService;
ServiceManagerImpl用于管理所有服務,啟動時預設會執行個體化核心的service、預設的service,如果使用者通過配置檔案配置了自定義service則也會執行個體化。啟動時注冊即将service執行個體put到ConcurrentMap中,核心service包括ClusterServiceImpl、InternalPartitionService、ProxyServiceImpl、TransactionManagerServiceImpl、ClientEngineImpl、QuorumServiceImpl。預設service包括MapService、LockService、QueueService、TopicService、ReliableTopicService、MultiMapService、ListService、SetService、DistributedExecutorService、AtomicLongService、AtomicReferenceService、CountDownLatchService、SemaphoreService、IdGeneratorService、MapReduceService、ReplicatedMapService、RingbufferService、XAService。如果允許還将把緩存服務CacheService添加進來。
public final class ServiceManagerImpl implements ServiceManager {
private final ConcurrentMap<String, ServiceInfo> services = new ConcurrentHashMap<String, ServiceInfo>(20, .75f, 1);
節點加入,Joiner負責加入工作,例如廣播則使用MulticastJoiner、單點傳播則使用TcpIpJoiner、AWS則使用TcpIpJoinerOverAWS。Node啟動時會根據情況啟動個線程,
multicast隻是做節點發現工作,真正的節點加入工作是交由tcpip做,向主節點發送加入請求,主節點把請求節點添加到成員清單中,然後傳回請求節點讓它把主節點位址設定為本人。
DefaultSerializers包括DateSerializer、ObjectSerializer、ClassSerializer等等序列化器,實作StreamSerializer的read和write方法完成序列化和反序列化處理。
Date.class, new DateSerializer());
BigInteger.class, new BigIntegerSerializer());
BigDecimal.class, new BigDecimalSerializer());
Externalizable.class, new Externalizer(enableCompression));
Serializable.class, new ObjectSerializer(enableSharedObject, enableCompression));
Class.class, new ClassSerializer());
Enum.class, new EnumSerializer());
DataSerializable.class, dataSerializerAdapter);
Portable.class, portableSerializerAdapter);
Byte.class, new ByteSerializer());
Boolean.class, new BooleanSerializer());
Character.class, new CharSerializer());
Short.class, new ShortSerializer());
Integer.class, new IntegerSerializer());
Long.class, new LongSerializer());
Float.class, new FloatSerializer());
Double.class, new DoubleSerializer());
byte[].class, new TheByteArraySerializer());
char[].class, new CharArraySerializer());
short[].class, new ShortArraySerializer());
int[].class, new IntegerArraySerializer());
long[].class, new LongArraySerializer());
float[].class, new FloatArraySerializer());
double[].class, new DoubleArraySerializer());
String.class, new StringSerializer());
每個service都有自己的context,例如mapservice的MapServiceContext,它裡面是保留了一份partition映射表的副本,在底層完成遷移之前并不會更新,當然底層的map資料也不會一邊遷移一邊删除,而是複制一份進行删除
==========廣告時間==========
鄙人的新書《Tomcat核心設計剖析》已經在京東預售了,有需要的朋友可以到 https://item.jd.com/12185360.html 進行預定。感謝各位朋友。
=========================