天天看點

分布式記憶體網格Hazelcast源碼導讀

去年項目需要看了hazelcast源碼,當時記錄的筆記。

Node是節點的抽象,裡面包含節點引擎、用戶端引擎、分區服務、叢集服務、多點傳播服務、連接配接管理、指令管理、多點傳播屬性、節點配置、本地成員、tcp位址、多點傳播位址、連接配接者、節點初始化器、管理中心、安全上下文、

Config類,包含GroupConfig、NetworkConfig、MapConfig、TopicConfig、QueueConfig、MultiMapConfig、ListConfig、SetConfig、ExecutorConfig、SemaphoreConfig、WanReplicationConfig、ServicesConfig、SecurityConfig、ListenerConfig、PartitionGroupConfig、ManagementCenterConfig、SerializationConfig。

GroupConfig叢集使用者名及密碼DEFAULT_GROUP_PASSWORD = "dev-pass";DEFAULT_GROUP_NAME = "dev";

NetworkConfig網絡相關配置,包括InterfacesConfig、JoinConfig、SSLConfig等,其中JoinConfig包括MulticastConfig、TcpIpConfig、AwsConfig。

SecurityConfig用戶端登陸相關配置。

WanReplicationConfig叢集複制配置,包括WanTargetClusterConfig配置,所有目标節點相關配置。

ClusterServiceImpl用于維護叢集各個成員節點,執行個體化時把本地節點加入成員Map中,MulticastService

Node執行個體化時根據MulticastConfig使用多點傳播加入一個組,MulticastService提供節點基于多點傳播傳遞的,使用監聽模式每當接收到其他節點傳播的消息調用監聽器的onMessage,傳遞的參數為JoinMessage,它由序列化子產品提供轉化,預設NodeMulticastListener監聽器,會對叢集的節點校驗,是否是同個叢集使用者名密碼。

MulticastJoiner用于加入叢集節點,建立JoinRequest對象用MulticastService發送給其他成員,不斷發送加入叢集請求直到node裡面的masteraddress,如果配置裡面指定了targetAddress就不用使用這種不斷發送的方式選舉主節點。MulticastService負責發送申請加入的多點傳播消息群組播消息接收及處理的工作。

找主節點方式:循環發送JoinRequest消息向組内發送,如果已加入狀态且是master的節點接收到後會向外多點傳播JoinMessage,告訴其他節點我的組成員資訊,還沒加入叢集的成員則将自己節點的master位址設定為master發出的JoinMessage中的位址。是以JoinMessage隻有master才會發出。其他已加入組的成員節點接收到JoinMessage類型消息則直接忽略,

JoinMessage包含包版本、位址等資訊。

SerializationService序列化轉化子產品,SerializationServiceBuilder生産者,預設位元組存放順序序列為ByteOrder.BIG_ENDIAN,即使用ByteArrayInputOutputFactory,

Packet為資料包結構,第一位元組為版本号,第二位元組表示為長度,第三位元組表示類型。

SocketConnector會執行連接配接操作,使用的是阻塞讀寫。

TcpIpConnectionManager內建ConnectionManager,用于管理TCPIP連接配接

JoinMessage{

protected byte packetVersion;

    protected int buildNumber;

    protected Address address;

    protected String uuid;

    protected ConfigCheck configCheck;

    protected int memberCount;

}

JoinRequest繼承JoinMessage,并添加credentials、tryCount屬性。

預設5701作為每個節點對外暴露的端口。

ReadHandler的handle方法會初始化socketReader,叢集成員維護的話使用SocketPacketReader讀取封包,

OperationServiceImpl類内部類RemoteOperationProcessor用于ResponseOperation對象

response.beforeRun();

response.run();

response.afterRun();

即會調用JoinRequestOperation的run方法,調用ClusterServiceImpl裡面的handleJoinRequest方法,完成成員更新工作,接收成功會向源節點傳回SetMasterOperation、MemberInfoUpdateOperation等等更新源節點成員及節點joined狀态。

裡面一切需要處理的都用run方法,會統一處理。

Node.start--TcpIpConnectionManager.start--InSelectorImpl.run(輸入即讀取選擇器,run不斷選擇可讀的對象并擷取attachment,調用它的handle方法,這裡的attachment是在TcpIpConnection執行個體化的時候建立readHandle對象并調用start方法把readHandler當做attachment注冊到socketChannel裡面的,readHandler調用handle方法,如果是叢集協定CLUSTER則執行個體化SocketPacketReader,SocketPacketReader包含一個PacketReader,預設是DefaultPacketReader,用于将套接字讀出來的位元組轉化成Packet包并調用handlePacket方法處理消息包,如果是叢集成員消息包則調用handleMemberPacket方法處理,間接調用NodeEngineImpl的handlePacket方法,有三種不同頭部類型:Packet.HEADER_OP、Packet.HEADER_EVENT、Packet.HEADER_WAN_REPLICATION,HEADER_OP表示操作類型,會調用operationService的handleOperation方法處理包消息,OperationThread會一直跑調用OperationRunner處理需要執行的Operation,OperationRunner線程數由hazelcast.operation.thread.count設定,如果-1則為機器cpu個數,接着會執行operation的beforeRun、run、operation如果設定了響應的話還要執行産生響應内容在傳回給調用者、afterRun方法。)--OutSelectorImpl.run(輸出即寫入選擇器)--SocketAcceptor.run--TcpIpConnection.start(TcpIpConnection包括WriteHandler和ReadHandler)--

private void process(Object task) {

        processedCount++;

        if (task instanceof Operation) {

            processOperation((Operation) task);

            return;

        }

        if (task instanceof Packet) {

            processPacket((Packet) task);

        if (task instanceof PartitionSpecificRunnable) {

            processPartitionSpecificRunnable((PartitionSpecificRunnable) task);

        throw new IllegalStateException("Unhandled task type for task:" + task);

    }

設計得不錯的是它直接支援各種Operation的傳遞,并執行,本質也是序列化反序列化,然後調用Operation的beforerun、run、afterrun方法,後面還會自動執行handleResponse方法,此方法用于向其他備份節點同步資料,這部分操作是在operation的afterrun之前完成備份。備份工作由OperationBackupHandler完成,backup方法,備份又分為同步備份和異步備份,相加等于總的需要備份數。map操作預設備份一份資料且是同步的,異步的預設為0.異步備份不會阻塞操作。備份的operation是Backup,它的run會執行PutOperation的run方法,即把資料放到緩存中并修改版本,這裡的run不會再執行複制操作。

<hazelcast>

  <map name="default">

    <backup-count>1</backup-count>

    <async-backup-count>1</async-backup-count>

  </map>

</hazelcast>

operation的run同樣會在備份節點上執行,putoperation其實就是在本地緩存更新值。備份的過程沒有一個ack機制,資訊傳輸的可靠性如何保證?

一個節點調用map的put操作時,會在本節點上緩存這個結果,再把operation傳輸到對應partition的第一個備份節點(這個節點可能就是自己本地節點)上,第一個節點接收到後備份到第二個節點上,是以預設就隻有兩個備份資料。是以nearcache緩存是可能存在每個節點上的。

PutOperation的afterRun方法主要是觸發一些攔截器,觸發各個節點的事件監聽器什麼的、更新各個備份節點緩存等等。

IOBalancer平衡io讀取寫入。

packet封包結構,1byte版本+2byte頭部+4byte分區+4byte長度+nbyte消息。

主節點一個一個發給其他成員節點關于成員的消息,其他節點進行更新。

ServiceManagerImpl用于管理所有的遠端,啟動時會注冊常用的一些服務,包括如下的服務,什麼map、queue什麼的

    private void registerCoreServices() {

        Node node = nodeEngine.getNode();

        registerService(ClusterServiceImpl.SERVICE_NAME, node.getClusterService());

        registerService(InternalPartitionService.SERVICE_NAME, node.getPartitionService());

        registerService(ProxyServiceImpl.SERVICE_NAME, nodeEngine.getProxyService());

        registerService(TransactionManagerServiceImpl.SERVICE_NAME, nodeEngine.getTransactionManagerService());

        registerService(ClientEngineImpl.SERVICE_NAME, node.clientEngine);

        registerService(QuorumServiceImpl.SERVICE_NAME, nodeEngine.getQuorumService());

    private void registerDefaultServices(ServicesConfig servicesConfig) {

        registerService(MapService.SERVICE_NAME, createService(MapService.class));

        registerService(LockService.SERVICE_NAME, new LockServiceImpl(nodeEngine));

        registerService(QueueService.SERVICE_NAME, new QueueService(nodeEngine));

        registerService(TopicService.SERVICE_NAME, new TopicService());

        registerService(ReliableTopicService.SERVICE_NAME, new ReliableTopicService(nodeEngine));

        registerService(MultiMapService.SERVICE_NAME, new MultiMapService(nodeEngine));

        registerService(ListService.SERVICE_NAME, new ListService(nodeEngine));

        registerService(SetService.SERVICE_NAME, new SetService(nodeEngine));

        registerService(DistributedExecutorService.SERVICE_NAME, new DistributedExecutorService());

        registerService(AtomicLongService.SERVICE_NAME, new AtomicLongService());

        registerService(AtomicReferenceService.SERVICE_NAME, new AtomicReferenceService());

        registerService(CountDownLatchService.SERVICE_NAME, new CountDownLatchService());

        registerService(SemaphoreService.SERVICE_NAME, new SemaphoreService(nodeEngine));

        registerService(IdGeneratorService.SERVICE_NAME, new IdGeneratorService(nodeEngine));

        registerService(MapReduceService.SERVICE_NAME, new MapReduceService(nodeEngine));

        registerService(ReplicatedMapService.SERVICE_NAME, new ReplicatedMapService(nodeEngine));

        registerService(RingbufferService.SERVICE_NAME, new RingbufferService(nodeEngine));

        registerService(XAService.SERVICE_NAME, new XAService(nodeEngine));

        registerCacheServiceIfAvailable();

Map通過AbstractMapServiceFactory建立,使用MapRemoteService處理遠端操作,RemoteService服務有兩個方法createDistributedObject和destroyDistributedObject方法,最終是通過MapProxyImpl。Map的partition政策在MapContainer裡面。

PutOperation用于執行叢集節點的put操作。一緻性哈希根據key使用MurmurHash哈希計算出結果,再根據分區數(預設271)取餘。每個節點都是使用new ConcurrentHashMap<Data, Record>(1000, 0.75f, 1)存放記錄。

成員組MemberGroup一共有若幹個組,多少個成員就多少個組,最大的複制節點數為7,如果成員組小于7則使用成員組數量。

address[271][4]

Address[271][7] 

1、addr0 addr9 addr3 addr4 null null null

2、.

.

271、addr1 addr2 addr3 addr4 null null null

對配置設定的結果嘗試重新配置設定,把過載的組配置設定一些成員給不足的組,并檢測每個成員組内的節點數相差不會超過系數1.1,否則重新分組,盡可能達到均勻。

最終形成一張表,271個分區每個分區都對應着若幹個複制的成員位址。

int avgPartitionPerGroup = partitionCount / groupSize;

就是說最多4個組,假如5個結點,分組情況為2,1,1,1,則每個組分到的partition個數為271/4,可能有些組多1個partition。

一共有271條線程處理operation,OperationThread,裡面有隊列ScheduleQueue,線程會不斷處理,ScheduleQueue用于operation執行緩沖隊列,裡面的有兩種隊列,normalQueue和priorityQueue,一種用于正常的排隊,一種用于設定優先級的隊列,take方法會優先從priorityQueue中擷取需要優先處理的operation。

有個同步複制、異步複制。

OperationServiceImpl.createInvocationBuilder(此方法有兩個,一個用于partition、一個用于指定的address),->  執行個體化一個InvocationBuilderImpl對象,調用invoke方法會建立Invocation,Invocation包含PartitionInvocation(針對分區)和TargetInvocation(針對指定節點)兩種。

在叢集中傳輸一切以Data形式傳輸。

Map的服務名為hz:impl:mapService。

假如HazelcastInstance執行instance.getMap("customers")則,通過HazelcastInstanceProxy的getMap方法,代理是調HazelcastInstanceImpl的getMap方法,調用getDistributedObject方法,它會通過ProxyService(它代理了所有服務)代理找到MapService,調用mapservice的createDistributedObject方法建立DistributedObject,間接調用MapRemoteService的createDistributedObject方法建立MapProxyImpl,調用MapProxyImpl的put方法把資料放到叢集,主要如下操作

final Data key = toData(k, partitionStrategy);

final Data value = toData(v);

final Data result = putInternal(key, value, ttl, timeunit);

return (V) toObject(result);

分别将key和value轉化為Data,其實是序列化,友善網絡傳輸。putInternal方法使用PutOperation,并且要根據key計算出partitionId,接着再完成operation的調用。

SerializationServiceImpl提供各種類型的序列化支援,toData提供由object到Data的轉化,toObject提供由Data到object的轉化,Data預設是使用DefaultData,DefaultData裡面其實就是包含了一個位元組數組還有不同的偏移量,例如從幾位到幾位表示類型,序列化工作其實也跟這種類似,把某一對象的相關資訊轉化為位元組數組,傳遞到目的地後再根據約定反向組裝成指定對象。

OperationThread專門用于執行接收到的請求,process方法,可以有三種請求,包括Operation、Packet、PartitionSpecificRunnable,分别不同的處理邏輯,Operation則直接反序列化後調用beforeRun、run、afterRun等方法。

HealthMonitor是獨立一條線程用于監控健康,包括

private class HealthMetrics {

        private final long memoryFree;

        private final long memoryTotal;

        private final long memoryUsed;

        private final long memoryMax;

        private final double memoryUsedOfTotalPercentage;

        private final double memoryUsedOfMaxPercentage;

        //following three load variables are always between 0 and 100.

        private final double processCpuLoad;

        private final double systemLoadAverage;

        private final double systemCpuLoad;

        private final int threadCount;

        private final int peakThreadCount;

        private final long clusterTimeDiff;

        private final int asyncExecutorQueueSize;

        private final int clientExecutorQueueSize;

        private final int queryExecutorQueueSize;

        private final int scheduledExecutorQueueSize;

        private final int systemExecutorQueueSize;

        private final int eventQueueSize;

        private final int pendingInvocationsCount;

        private final double pendingInvocationsPercentage;

        private final int operationServiceOperationExecutorQueueSize;

        private final int operationServiceOperationPriorityExecutorQueueSize;

        private final int operationServiceOperationResponseQueueSize;

        private final int runningOperationsCount;

        private final int remoteOperationsCount;

        private final int proxyCount;

        private final int clientEndpointCount;

        private final int activeConnectionCount;

        private final int currentClientConnectionCount;

        private final int connectionCount;

        private final int ioExecutorQueueSize;

PerformanceMonitor表示性能監控,監控的參數包括inSelector的已讀取數量,outSelect的已寫入事件數量,OperationService相關的性能參數,例如挂起的調用比例、總體調用比例、最大調用數量、271個分區線程已執行數量、271個分區operation線程正在挂起線程(即任務排隊隊列中的數量),正常operation線程的排隊隊列數量、正常operation線程已執行數量、響應線程已執行數量、響應線程排隊隊列數量。

hazelcast核心——Node,包含各種各樣重要的基礎服務,日志、節點關閉鈎子、序列化服務、節點引擎、用戶端引擎、分區服務、叢集服務、廣播服務、連接配接管理服務、指令服務、配置檔案服務、群組屬性服務、本節點位址、本地叢集成員對象、主節點位址、hazelcast執行個體引用、日志服務、叢集節點加入服務、節點擴充服務、管理中心服務、安全上下文、建立資訊服務、版本校對服務、hazelcast線程組。

public class Node {

    private final ILogger logger;

    private final NodeShutdownHookThread shutdownHookThread = new NodeShutdownHookThread("hz.ShutdownThread");

    private final SerializationService serializationService;

    public final NodeEngineImpl nodeEngine;

    public final ClientEngineImpl clientEngine;

    public final InternalPartitionService partitionService;

    public final ClusterServiceImpl clusterService;

    public final MulticastService multicastService;

    public final ConnectionManager connectionManager;

    public final TextCommandServiceImpl textCommandService;

    public final Config config;

    public final GroupProperties groupProperties;

    public final Address address;

    public final MemberImpl localMember;

    private volatile Address masterAddress = null;

    public final HazelcastInstanceImpl hazelcastInstance;

    public final LoggingServiceImpl loggingService;

    private final Joiner joiner;

    private final NodeExtension nodeExtension;

    private ManagementCenterService managementCenterService;

    public final SecurityContext securityContext;

    private final ClassLoader configClassLoader;

    private final BuildInfo buildInfo;

    private final VersionCheck versionCheck = new VersionCheck();

    private final HazelcastThreadGroup hazelcastThreadGroup;

NodeEngineImpl作為節點引擎包含了許多服務,重要的例如,事件服務、operation服務、執行服務、等待通知服務、service(内置許多service,例如Map、Queue,使用者自定義的服務可配置到hazelcast.xml,啟動時會加載進來)管理服務、事務管理服務、代理服務、wan複制服務、包傳輸服務、證明人服務。

public class NodeEngineImpl implements NodeEngine {

    private final Node node;

    private final EventServiceImpl eventService;

    private final OperationServiceImpl operationService;

    private final ExecutionServiceImpl executionService;

    private final WaitNotifyServiceImpl waitNotifyService;

    private final ServiceManagerImpl serviceManager;

    private final TransactionManagerServiceImpl transactionManagerService;

    private final ProxyServiceImpl proxyService;

    private final WanReplicationService wanReplicationService;

    private final PacketTransceiver packetTransceiver;

    private final QuorumServiceImpl quorumService;

ServiceManagerImpl用于管理所有服務,啟動時預設會執行個體化核心的service、預設的service,如果使用者通過配置檔案配置了自定義service則也會執行個體化。啟動時注冊即将service執行個體put到ConcurrentMap中,核心service包括ClusterServiceImpl、InternalPartitionService、ProxyServiceImpl、TransactionManagerServiceImpl、ClientEngineImpl、QuorumServiceImpl。預設service包括MapService、LockService、QueueService、TopicService、ReliableTopicService、MultiMapService、ListService、SetService、DistributedExecutorService、AtomicLongService、AtomicReferenceService、CountDownLatchService、SemaphoreService、IdGeneratorService、MapReduceService、ReplicatedMapService、RingbufferService、XAService。如果允許還将把緩存服務CacheService添加進來。

public final class ServiceManagerImpl implements ServiceManager {

    private final ConcurrentMap<String, ServiceInfo> services = new ConcurrentHashMap<String, ServiceInfo>(20, .75f, 1);

節點加入,Joiner負責加入工作,例如廣播則使用MulticastJoiner、單點傳播則使用TcpIpJoiner、AWS則使用TcpIpJoinerOverAWS。Node啟動時會根據情況啟動個線程,

multicast隻是做節點發現工作,真正的節點加入工作是交由tcpip做,向主節點發送加入請求,主節點把請求節點添加到成員清單中,然後傳回請求節點讓它把主節點位址設定為本人。

DefaultSerializers包括DateSerializer、ObjectSerializer、ClassSerializer等等序列化器,實作StreamSerializer的read和write方法完成序列化和反序列化處理。

Date.class, new DateSerializer());

        BigInteger.class, new BigIntegerSerializer());

        BigDecimal.class, new BigDecimalSerializer());

        Externalizable.class, new Externalizer(enableCompression));

        Serializable.class, new ObjectSerializer(enableSharedObject, enableCompression));

        Class.class, new ClassSerializer());

        Enum.class, new EnumSerializer());

DataSerializable.class, dataSerializerAdapter);

        Portable.class, portableSerializerAdapter);

        Byte.class, new ByteSerializer());

        Boolean.class, new BooleanSerializer());

        Character.class, new CharSerializer());

        Short.class, new ShortSerializer());

        Integer.class, new IntegerSerializer());

        Long.class, new LongSerializer());

        Float.class, new FloatSerializer());

        Double.class, new DoubleSerializer());

        byte[].class, new TheByteArraySerializer());

        char[].class, new CharArraySerializer());

        short[].class, new ShortArraySerializer());

        int[].class, new IntegerArraySerializer());

        long[].class, new LongArraySerializer());

        float[].class, new FloatArraySerializer());

        double[].class, new DoubleArraySerializer());

        String.class, new StringSerializer());

每個service都有自己的context,例如mapservice的MapServiceContext,它裡面是保留了一份partition映射表的副本,在底層完成遷移之前并不會更新,當然底層的map資料也不會一邊遷移一邊删除,而是複制一份進行删除

==========廣告時間==========

鄙人的新書《Tomcat核心設計剖析》已經在京東預售了,有需要的朋友可以到 https://item.jd.com/12185360.html 進行預定。感謝各位朋友。

=========================

繼續閱讀