根據guice的注冊綁定機制,discovery的綁定順序是這樣的:
InternalNode中添加DiscoveryModule:
modules.add(new DiscoveryModule(settings));
DiscoveryModule中建立了LocalDiscoveryModule(不分析)或者ZenDiscoveryModule:
public Iterable<? extends Module> spawnModules() {
Class<? extends Module> defaultDiscoveryModule;
if (DiscoveryNode.localNode(settings)) {
defaultDiscoveryModule = LocalDiscoveryModule.class;
} else {
defaultDiscoveryModule = ZenDiscoveryModule.class;
}
return ImmutableList.of(Modules.createModule(settings.getAsClass(DISCOVERY_TYPE_KEY, defaultDiscoveryModule, "org.elasticsearch.discovery.", "DiscoveryModule"), settings));
}
ZenDiscoveryModule中将Discovery Class綁定到了ZenDiscovery Class
protected void bindDiscovery() {
bind(Discovery.class).to(ZenDiscovery.class).asEagerSingleton();
}
InternalNode中啟動DiscoveryService:
DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start();
DiscoveryService最終綁定到哪裡呢?在ZenDiscovery Class上:
@Inject
public DiscoveryService(Settings settings, Discovery discovery) {
super(settings);
this.discovery = discovery;
this.initialStateTimeout = componentSettings.getAsTime("initial_state_timeout", TimeValue.timeValueSeconds(30));
}
看,是在Discovery這個類中,是以最終落在ZenDiscovery上。
DiscoveryService啟動過程就是啟動ZenDiscovery,如下:
protected void doStart() throws ElasticsearchException {
initialStateListener = new InitialStateListener();
discovery.addListener(initialStateListener);
discovery.start();
logger.info(discovery.nodeDescription());
}
ZenDiscovery除了共用transport等其他子產品外,還有以下幾個輔助類:
this.electMaster = new ElectMasterService(settings);
nodeSettingsService.addListener(new ApplySettings());
this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this);
this.masterFD.addListener(new MasterNodeFailureListener());
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService);
this.nodesFD.addListener(new NodeFailureListener());
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings);
this.pingService.setNodesProvider(this);
this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());
NodeFaultDetection:檢測其他叢集節點的狀态(master節點使用)
在其中維護了一張Node和NodeFD的映射關系,注冊了PingRequestHandler和FDConnectionListener(根據setting設定),以及處理邏輯。
private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();
updateNodes用來更新以上資料結構:
public void updateNodes(DiscoveryNodes nodes) {
DiscoveryNodes prevNodes = latestNodes;
this.latestNodes = nodes;
if (!running) {
return;
}
DiscoveryNodes.Delta delta = nodes.delta(prevNodes);
for (DiscoveryNode newNode : delta.addedNodes()) {
if (newNode.id().equals(nodes.localNodeId())) {
// no need to monitor the local node
continue;
}
if (!nodesFD.containsKey(newNode)) {
nodesFD.put(newNode, new NodeFD());
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(newNode));
}
}
for (DiscoveryNode removedNode : delta.removedNodes()) {
nodesFD.remove(removedNode);
}
}
通過比較目前叢集中nodes和上一個記錄的叢集中nodes來确定新加入或者删除的node,本地節點無需監控。最終在nodesFD中的節點就是目前需要檢測的節點。
MasterNodeDetection:檢測master節點的狀态(非master節點使用)
ElectMasterService:選舉master節點。
現在來看一下ZenDiscovery的啟動流程:
protected void doStart() throws ElasticsearchException {
Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
// note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling
final String nodeId = DiscoveryService.generateNodeId(settings);
localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes, version);
latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build();
nodesFD.updateNodes(latestDiscoNodes);
pingService.start();
// do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
asyncJoinCluster();
}
NodeFaultDetection執行個體nodesFD會更新維護的map結構(新節點加入了,或許這個時刻有些節點也被删除了,擷取新的delta關系)
然後啟動加入本地節點的流程asyncJoinCluster,最終調用到innerJoinCluster:
大緻邏輯是先findMaster,如果是master是自己,做辨別并啟動NodesFaultDetection來監控其餘節點,釋出自己是master節點的消息,如果不是自己,循環連接配接master節點,直到連接配接成功,向master節點發送要求加入cluster的消息,啟動MasterFaultDetection,來監控master節點的狀态。
findMaster會确定master節點:對各個節點發送ping資訊,根據傳回的節點屬性還擷取可能的master節點(過濾client節點,過濾data但是不是master的節點)。判斷目前擷取的具有master節點資格的資料是否符合最小master節點數目,如果不符合,說明還有節點沒有啟動,傳回null,繼續等待節點啟動直到符合最小master節點數目位置。下邊出現了兩種情況:如果在節點的ping資訊傳回中含有master節點,則在這些ping資訊中包含的master節點中确定一個master節點傳回,如果在節點的ping資訊中無master節點,則在具有master節點資格的節點中選舉一個作為master節點。選舉政策有ElectMasterService負責。