天天看點

elasticsearch源碼分析---discovery子產品

根據guice的注冊綁定機制,discovery的綁定順序是這樣的:

InternalNode中添加DiscoveryModule:

modules.add(new DiscoveryModule(settings));
           

DiscoveryModule中建立了LocalDiscoveryModule(不分析)或者ZenDiscoveryModule:

public Iterable<? extends Module> spawnModules() {
        Class<? extends Module> defaultDiscoveryModule;
        if (DiscoveryNode.localNode(settings)) {
            defaultDiscoveryModule = LocalDiscoveryModule.class;
        } else {
            defaultDiscoveryModule = ZenDiscoveryModule.class;
        }
        return ImmutableList.of(Modules.createModule(settings.getAsClass(DISCOVERY_TYPE_KEY, defaultDiscoveryModule, "org.elasticsearch.discovery.", "DiscoveryModule"), settings));
    }
           

ZenDiscoveryModule中将Discovery Class綁定到了ZenDiscovery Class

protected void bindDiscovery() {
        bind(Discovery.class).to(ZenDiscovery.class).asEagerSingleton();
    }
           

InternalNode中啟動DiscoveryService:

DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start();
           

DiscoveryService最終綁定到哪裡呢?在ZenDiscovery Class上:

@Inject
    public DiscoveryService(Settings settings, Discovery discovery) {
        super(settings);
        this.discovery = discovery;
        this.initialStateTimeout = componentSettings.getAsTime("initial_state_timeout", TimeValue.timeValueSeconds(30));
    }
           

看,是在Discovery這個類中,是以最終落在ZenDiscovery上。

DiscoveryService啟動過程就是啟動ZenDiscovery,如下:

protected void doStart() throws ElasticsearchException {
        initialStateListener = new InitialStateListener();
        discovery.addListener(initialStateListener);
        discovery.start();
        logger.info(discovery.nodeDescription());
    }
           

ZenDiscovery除了共用transport等其他子產品外,還有以下幾個輔助類:

this.electMaster = new ElectMasterService(settings);
        nodeSettingsService.addListener(new ApplySettings());

        this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this);
        this.masterFD.addListener(new MasterNodeFailureListener());

        this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService);
        this.nodesFD.addListener(new NodeFailureListener());

        this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings);
        this.pingService.setNodesProvider(this);
        this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());
           

NodeFaultDetection:檢測其他叢集節點的狀态(master節點使用)

在其中維護了一張Node和NodeFD的映射關系,注冊了PingRequestHandler和FDConnectionListener(根據setting設定),以及處理邏輯。

private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = newConcurrentMap();
           

updateNodes用來更新以上資料結構:

public void updateNodes(DiscoveryNodes nodes) {
        DiscoveryNodes prevNodes = latestNodes;
        this.latestNodes = nodes;
        if (!running) {
            return;
        }
        DiscoveryNodes.Delta delta = nodes.delta(prevNodes);
        for (DiscoveryNode newNode : delta.addedNodes()) {
            if (newNode.id().equals(nodes.localNodeId())) {
                // no need to monitor the local node
                continue;
            }
            if (!nodesFD.containsKey(newNode)) {
                nodesFD.put(newNode, new NodeFD());
                threadPool.schedule(pingInterval, ThreadPool.Names.SAME, new SendPingRequest(newNode));
            }
        }
        for (DiscoveryNode removedNode : delta.removedNodes()) {
            nodesFD.remove(removedNode);
        }
    }
           

通過比較目前叢集中nodes和上一個記錄的叢集中nodes來确定新加入或者删除的node,本地節點無需監控。最終在nodesFD中的節點就是目前需要檢測的節點。

MasterNodeDetection:檢測master節點的狀态(非master節點使用)

ElectMasterService:選舉master節點。

現在來看一下ZenDiscovery的啟動流程:

protected void doStart() throws ElasticsearchException {
        Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
        // note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling
        final String nodeId = DiscoveryService.generateNodeId(settings);
        localNode = new DiscoveryNode(settings.get("name"), nodeId, transportService.boundAddress().publishAddress(), nodeAttributes, version);
        latestDiscoNodes = new DiscoveryNodes.Builder().put(localNode).localNodeId(localNode.id()).build();
        nodesFD.updateNodes(latestDiscoNodes);
        pingService.start();

        // do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
        asyncJoinCluster();
    }
           

NodeFaultDetection執行個體nodesFD會更新維護的map結構(新節點加入了,或許這個時刻有些節點也被删除了,擷取新的delta關系)

然後啟動加入本地節點的流程asyncJoinCluster,最終調用到innerJoinCluster:

大緻邏輯是先findMaster,如果是master是自己,做辨別并啟動NodesFaultDetection來監控其餘節點,釋出自己是master節點的消息,如果不是自己,循環連接配接master節點,直到連接配接成功,向master節點發送要求加入cluster的消息,啟動MasterFaultDetection,來監控master節點的狀态。

findMaster會确定master節點:對各個節點發送ping資訊,根據傳回的節點屬性還擷取可能的master節點(過濾client節點,過濾data但是不是master的節點)。判斷目前擷取的具有master節點資格的資料是否符合最小master節點數目,如果不符合,說明還有節點沒有啟動,傳回null,繼續等待節點啟動直到符合最小master節點數目位置。下邊出現了兩種情況:如果在節點的ping資訊傳回中含有master節點,則在這些ping資訊中包含的master節點中确定一個master節點傳回,如果在節點的ping資訊中無master節點,則在具有master節點資格的節點中選舉一個作為master節點。選舉政策有ElectMasterService負責。

繼續閱讀