天天看點

你想擁有自己的搜尋引擎嗎?-elasticsearch篇

背景

現有某大型政務系統,采用微服務架構,目前正在運作的節點數約有150個,為了更好的監控系統内各節點的運作情況,就需要搜集每個節點的運作日志,目前每天産生的日志量約為1個G,如此龐大的資料量,使用傳統的資料庫來存儲顯然是不現實的,現我們需要一個可以具有大存儲、高并發、可拓展、秒級查詢能力的高可用的分布式日志系統。普元技術中台的日志子產品基于分布式搜尋引擎elasticsearch實作了大資料量日志的存儲、查詢和管理等功能,elasticsearch的分布式架構特性為我們建構分布式日志系統提供了基礎保障。下面就讓我們來聊聊elasticsearch的分布式架構吧。

1. elasticsearch叢集基本概念和原理

1.1 叢集節點角色

主節點(Master node)

主節點負責叢集層面上的操作,叢集管理等,通過配置node.master:true(預設) 使這個節點具有Master節點的資格,主節點是全局唯一的,是在具有Master節點資格中選舉。

主節點也可以作為資料節點,但是盡量不要這麼做,生成環境中,盡量分離主節點和資料節點,建立獨立的主節點配置如下:

node.master: true

node.data: false

資料節點(Data node)

負責儲存資料、執行CRUD、搜尋、聚合等操作。資料節點對I/O、記憶體、CPU要求較高。一般情況下資料讀寫隻和Data node互動,不和Master node互動

node.master: false

node.data: true

node.ingest: false

協調節點(Coordinating node)

用戶端請求可以發送到叢集的任何節點,每個節點知道任意文檔所處的位置,然後轉發這些請求,收集資料并且傳回給用戶端,處理用戶端請求的節點稱為協調節點。

協調節點将請求轉發給資料節點。每個資料節點在本地執行請求,并将結果傳回給協調節點。協調節點将每個資料節點傳回的結果合并,對結果進行排序,這個排序過程需要很多CPU和記憶體資源

預處理節點(Ingest node)

在es寫入資料之前,通過預定義好的processors和pipeline對資料進行過濾、轉換。processors和pipeline攔截bulk和index請求,在應用相關操作後将文檔傳回index和bulk api;預設情況下節點上啟用ingest,如果想在某個節點上關閉ingest,則設定node.ingest: false

1.2 叢集健康狀态

從資料完成性角度劃分,叢集健康狀态分為三種:

· Green 所有的主分片和副分片都正常運作

· Yellow 所有的主分片都正常運作,但是副分片不全是正常運作的,意味着存在單節點故障的風險

· Red 有主分片沒有正常運作

每個索引也有這三種狀态,假如丢失了一個副分片,該分片所屬的索引和整個叢集變為Yellow狀态,其他的索引為Green

1.3 主要内部子產品簡介

Cluster

Cluster子產品是主節點執行叢集管理的封裝實作,管理叢集狀态,維護叢集層面的叢集資訊。

主要功能:

· 管理叢集狀态,将新生成的叢集狀态釋出到叢集所有節點。

· 在叢集各個節點中直接遷移分片,保持資料平衡。

· 調用allocation子產品執行分片配置設定,決策哪些分片應該配置設定到哪個節點

Discovery

發現子產品負責發現叢集中的節點,以及選舉主節點。當節點加入或退出叢集,主節點會采取相應的行動。從某種角度來說,發現子產品起到類似Zookeeper的作用,選主并管理叢集拓撲。

Indices

索引子產品管理全局級的索引設定,不包括索引級的(索引設定分為全局級和每個索引級),它還封裝了索引資料恢複功能。叢集啟動階段需要的主分片恢複和副分片恢複就是這個子產品實作的

gateway

負責對收到Master 廣播下的clutser state 資料的持久化存儲,并且在叢集重新開機時恢複它們。

allocation

封裝了分片配置設定相關的功能和政策,包括主分片的配置設定和副分片的配置設定,本子產品主要是節點的調用,建立新索引、叢集完全重新開機都需要分片配置設定的過程。

HTTP

http子產品允許通過JSON over HTTP的方式通路ES 的API,HTTP子產品的本質是完全異步,這意味着沒有阻塞線程等待響應,使用異步通信進行HTTP的好處是解決10K量級的并發連接配接

Transport

Transport子產品用于叢集内節點之間的内部通信。從一個節點到另外一個節點每個内部請求都使用Transport子產品。

如同HTTP子產品,Transport子產品本質也是完全異步的,Transport子產品使用的是TCP通信,每個節點都與其他的節點維持若幹TCP長連接配接,内部節點間的通信都是Transport子產品承載的。

軟體設計經常依賴于抽象,IoC就是很好的實作方式,并且在内部實作了對象的建立和管理,ES使用的Guice架構進行子產品化管理。Guice是Google開的的輕量級依賴注入架構。

1.4 子產品結構

在Guice架構下,子產品是由Service和Module類組成的,Service實作業務功能,Module類配置綁定資訊。

AbstractModule是Guice提供的基類,子產品需要從這個類繼承。Module類主要作用用于定義綁定關系,例如:

 protected void configure() {

        bind(GatewayAllocator.class).asEagerSingleton();

        bind(AllocationService.class).toInstance(allocationService);

        bind(ClusterService.class).toInstance(clusterService);

        bind(NodeConnectionsService.class).asEagerSingleton();

        bind(MetaDataCreateIndexService.class).asEagerSingleton();

        bind(MetaDataDeleteIndexService.class).asEagerSingleton();

        bind(MetaDataIndexStateService.class).asEagerSingleton();

        bind(MetaDataMappingService.class).asEagerSingleton();

        bind(MetaDataIndexAliasesService.class).asEagerSingleton();

        bind(MetaDataUpdateSettingsService.class).asEagerSingleton();

        bind(MetaDataIndexTemplateService.class).asEagerSingleton();

        bind(IndexNameExpressionResolver.class).toInstance(indexNameExpressionResolver);

        bind(RoutingService.class).asEagerSingleton();

        bind(DelayedAllocationService.class).asEagerSingleton();

        bind(ShardStateAction.class).asEagerSingleton();

        bind(NodeMappingRefreshAction.class).asEagerSingleton();

        bind(MappingUpdatedAction.class).asEagerSingleton();

        bind(TaskResultsService.class).asEagerSingleton();

        bind(AllocationDeciders.class).toInstance(allocationDeciders);

        bind(ShardsAllocator.class).toInstance(shardsAllocator);

    }

1.5 子產品管理

定義好子產品由ModulesBuilder類統一管理,ModulesBuilder是ES對Guice封裝,内部調用Guice接口,主要對外提供兩個方法。

· add方法:添加建立好的子產品

· createInjector方法:調用Guice.createInjector建立傳回Injector,後續調用Injector擷取相應的Service類的執行個體。

使用ModulesBuilder進行管理的代理示例:

ModulesBuilder modules = new ModulesBuilder();

//Cluster子產品

ClusterModule clusterModule = new ClusterModule();

modules.add(clusterModule);

//....

//建立Injector

Injector injector = modules.createInjector();

setGatewayAllocator(injector.getInstance(GatewayAllocator.class))

子產品化的封裝讓ES易于擴充,插件本身也是子產品,節點啟動時被子產品管理器添加進來。

2. 叢集啟動流程

叢集啟動期間要經曆選舉主節點、主分片、資料恢複等階段,梳理其中的原理和細節,對于解決或避免叢集遇到問題如腦裂、無主、恢複慢、丢資料有很大的作用。

2.1 選取主節點

ES叢集啟動首先從活躍的機器清單中選取一個作為主節點,選主之後的流程由主節點觸發。ES的選主算法是基于Bully算法,主要是對節點的ID進行排序,取ID值最大的節點作為Master,每個節點都運作這個流程。選主的目的是确定唯一的主節點。

節點ID排序選舉算法的條件:

(1)參選數過半,達到quorum(多數)後選出臨時的主節點。

(2)得票數需要過半。某個節點被選為主節點,必須判斷加入它的節點數過半,才确認Master身份。

(3)當檢查到節點離開是,需要判斷目前節點是否過半。如果達不到quorum,則放棄Master,重新加入叢集。如果不這麼做容易産生雙主,俗稱腦裂。

叢集不知道共有多少節點,quorum從配置中讀取,設定配置項:

discovery.zen.minimun_master_node

2.2 選取叢集元資訊

被選出的master和叢集中繼資料資訊的新舊程度沒有關系。它的第一任務是選舉元資訊,讓各節點把各自的存儲的元資訊發送過來,根據_version版本号來确定最新的元資訊,然後把資訊broadcast下去,這樣叢集的所有節點都有最新的元資訊。

叢集元資訊的選舉包括兩個級别:叢集級和索引級。不包含哪個shard存在哪個node節點這種資訊。資訊以節點磁盤存儲為準,需要上報。讀寫流程不經過master節點,master不知道各個shard副本直接的資料差異。HDFS也是類似的機制,block塊資訊依賴于DataNode的上報。

為了叢集的一緻性,參與選舉的元資訊數量需要過半,Master釋出叢集狀态成功的規則也是等待釋出成功的節點數過半。

在選舉過程中,不接受新節點的加入請求,叢集元資訊選舉完畢後,Master釋出首次叢集狀态,然後開始選舉shard級的元資訊。

2.3 allocation過程

選舉shard級元資訊,建構路由表資訊,是在allocation子產品中完成。初始階段,所有的shard都處于unassigned狀态。ES中通過配置設定過程決定哪個分片位于哪個節點,重構内容路由表。此時,首先要做的是配置設定主分片。

1.選主shard

某個主分片(sent[0])是如何配置設定的?

首先配置設定工作都是Master來做的,此時Master不知道主分片在哪,它向叢集的所有節點詢問:sent[0]分片元資訊發送給我,master等待所有請求傳回,正常情況下就有了shard的資訊,然後根據某種政策選出一個分片為主分片。這種詢問量=shard數 X 節點數。所有我們需要控制shard分片數别太大。

考慮哪個分片作為主分片?

ES5.x以下版本通過對比shard級元資訊的版本号來确定。

ES5.x以後開始使用給每個shard設定一個UUID,然後在叢集級的元資訊中記錄哪個shard是最新的,主分片選舉過程是通過叢集級元資訊中記錄“最新主分片清單”來确定主分片的。

如果叢集設定了:

cluster.routing.allocation.enable: none

禁止配置設定分片,叢集仍會強制配置設定分片,設定改選項,叢集重新開機後狀态為Yellow,而非Red。

2.選副shard

主分片選舉完成後,從上一個過程彙總的shard資訊中選擇一個副本作為副分片。如果彙總資訊不存在,則配置設定一個全新副本的操作依賴于延遲配置項:

index.unassigned.node_left.delayed_timeout

2.4 index recovery

分片配置設定成功後進入recovery流程。主shard的recovery不會等待其副分片配置設定成功才開始recovery,它是獨立的流程,隻是副分片的recovery需要主分片恢複完畢才開始。

為什麼需要recovery?

對于主分片來說,可能一些資料沒有flush;對于副分片來說,一是沒有flush,二是主分片寫完了,副分片還沒的及寫,造成主副分片資料不一緻。

1.主shard recovery

由于每次寫操作都會記錄translog,事務日志中記錄了什麼操作和相關的資料。是以将最後一次送出(Lucene的一次送出就是一次fsync刷盤的過程)之後的translog中,建立Lucene索引,如此完成主分片的recovery。

2.副shard recovery

階段1:在主分片所在的節點上,擷取translog保留鎖,從擷取保留鎖開始,會保留translog不受其刷盤清空的影響。然後調用Lucene接口把shard做快照,這是已經刷盤中的分片資料。把這些shard資料複制到副本節點。在階段1完畢前,向副本分片發送告知對方啟動engine,在階段2開始之前,副分片就可以正常處理寫請求。

階段2:對tanslog做快照,這個快照包含階段1開始,到執行translog快照期間的新增索引。将這些translog發送到副分片所在的節點進行重寫。

3. 選主流程

3.1 為什麼使用主從模式

除了主從模式外,另外一種選擇的是分布式哈希模式,可以支援每小時數千個節點的加入和離開,其可以在不了解底層網絡拓撲的異構網絡中工作,查詢響應時間為4到10跳(中轉次數),例如Cassandra就使用這種方案。但是在相穩定的對等網絡中,主從模式會更好。

ES的典型應用場景的另一個簡化是叢集中沒有那麼多節點。通常,節點的數量遠遠小于單個節點能夠維護的連接配接數,并且網絡環境不必經常處理節點的加入和離開,這就是為什麼主從模式更适合ES。

3.2 流程分析

整體流程可以概括:選舉臨時的Master,如果本節點當選,則等待确立Master,如果其他的節點當選,則嘗試加入叢集,然後啟動節點失效偵察。具體如下圖所示:

你想擁有自己的搜尋引擎嗎?-elasticsearch篇

臨時Master選舉過程如下:

(1)ping 所有的節點,擷取節點清單fullPingResponses,ping結果不包括本節點

(2)建構兩個清單。

activeMasters清單:存儲叢集目前活躍Master清單。周遊第一步擷取的所有節點,将每個節點所認為的目前Master節點加入activeMasters清單中(不包括本節點)。在周遊過程中,如果配置了discovery.zen.master_election.ingore_non_master_pings為true(預設為false),而節點又不具備Master資格,則跳過該節點。具體流程如下圖:

你想擁有自己的搜尋引擎嗎?-elasticsearch篇

masterCandidates清單:存儲master候選者清單。周遊第一步擷取清單,去掉不具備Master資格的節點,添加到這個清單中。

(3)如果activeMasters為空,則從masterCandidates中選舉,結果可能選舉成功,也可能選舉失敗。如果不為空,則activeMasters中選擇最合适的作為Master。流程圖如下:

你想擁有自己的搜尋引擎嗎?-elasticsearch篇

從masterCandidates中選主具體細節封裝在ElectMasterService類中

//從MasterCandidate中選主時,首先判斷目前候選數是否達到法定數,否則選主失敗

public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {

        //候選者為空,傳回失敗

        if (candidates.isEmpty()) {

            return false;

        }

        //預設值為-1 確定單節點的叢集可以正常選主

        if (minimumMasterNodes < 1) {

            return true;

        return candidates.size() >= minimumMasterNodes;

//當候選數達到額定數後,從候選者中選一個作為Master

public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {

        assert hasEnoughCandidates(candidates);

        List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);

        //通過自定義的比較函數對候選者節點從小到大排序

        sortedCandidates.sort(MasterCandidate::compare);

        //傳回最新的最為Master

        return sortedCandidates.get(0);

}

4. 網關子產品

gateway子產品負責叢集元資訊的存儲和叢集重新開機時的恢複。

ES中存儲的資料有下面幾種:

· state中繼資料資訊

· index Lucene生成的索引檔案

· translog事務日志

分别對應ES中的資料結構:

· MetaData(叢集層),主要是ClusterUUID、settings、templates等

· IndexMetaData(索引層),主要是numberOfShards、mappings等

· ShardStateMetaData(分片層),主要是version、indexUUID、primary等。

持久化的state不包括某個分片存在于哪個節點這種内容路由資訊,叢集完全重新開機時,依靠gateway的recovery過程重建RoutingTable。當讀取某個文檔時,根據路由算法确定目的的分片後,從RoutingTable中查找分片位于哪個節點,然後将請求轉發到目的節點。

4.1 中繼資料持久化

隻有具備Master資格的節點和Data Node可以持久化叢集狀态。當收到主節點釋出的叢集狀态時,節點判斷元資訊是否發生變化,如果發生變化則将其持久到磁盤中。

GatewayMetaState負責收集叢集狀态,當收到新的叢集狀态時,ClusterApplierService通知全部的applier應用該叢集狀态:

 private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent){

//周遊全部的applier,依次調用各子產品對叢集狀态的處理        

     clusterStateAppliers.forEach(applier -> {

            try {

//調用各個子產品實作的applyClusterState

                applier.applyClusterState(clusterChangedEvent);

            } catch (Exception ex) {

                logger.warn("failed to notify ClusterStateApplier", ex);

            }

        });

執行檔案寫入的過程封裝在MetaDataStateFormat中,全局元資訊和索引級元資訊的寫入都執行三個流程:寫臨時檔案、刷盤、move成目标檔案。

 public final void write(final T state, final Path... locations) throws IOException {

        if (locations == null) {

            throw new IllegalArgumentException("Locations must not be null");

        if (locations.length <= 0) {

            throw new IllegalArgumentException("One or more locations required");

        final long maxStateId = findMaxStateId(prefix, locations)+1;

        assert maxStateId >= 0 : "maxStateId must be positive but was: [" + maxStateId + "]";

        final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION;

        Path stateLocation = locations[0].resolve(STATE_DIR_NAME);

        Files.createDirectories(stateLocation);

        final Path tmpStatePath = stateLocation.resolve(fileName + ".tmp");

        final Path finalStatePath = stateLocation.resolve(fileName);

        try {

            final String resourceDesc = "MetaDataStateFormat.write(path=\"" + tmpStatePath + "\")";

            try (OutputStreamIndexOutput out =

                     new OutputStreamIndexOutput(resourceDesc, fileName, Files.newOutputStream(tmpStatePath), BUFFER_SIZE)) {

                CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION);

                out.writeInt(format.index());

                try (XContentBuilder builder = newXContentBuilder(format, new IndexOutputOutputStream(out) {

                    @Override

                    public void close() throws IOException {

                    } })) {

                    builder.startObject();

                    {

                        toXContent(builder, state);

                    }

                    builder.endObject();

                }

                CodecUtil.writeFooter(out);

            IOUtils.fsync(tmpStatePath, false); // fsync the state file

            Files.move(tmpStatePath, finalStatePath, StandardCopyOption.ATOMIC_MOVE);

            IOUtils.fsync(stateLocation, true);

            for (int i = 1; i < locations.length; i++) {

                stateLocation = locations[i].resolve(STATE_DIR_NAME);

                Files.createDirectories(stateLocation);

                Path tmpPath = stateLocation.resolve(fileName + ".tmp");

                Path finalPath = stateLocation.resolve(fileName);

                try {

                    Files.copy(finalStatePath, tmpPath);

                    // we are on the same FileSystem / Partition here we can do an atomic move

                    Files.move(tmpPath, finalPath, StandardCopyOption.ATOMIC_MOVE);

                    IOUtils.fsync(stateLocation, true); // we just fsync the dir here..

                } finally {

                    Files.deleteIfExists(tmpPath);

        } finally {

            Files.deleteIfExists(tmpStatePath);

        cleanupOldFiles(prefix, fileName, locations);

    }          

4.2 中繼資料恢複

Gateway的recovery負責找到正确的中繼資料,應用到叢集。

目前叢集完成重新開機,達到recovery條件時,進入中繼資料恢複流程,一般情況下,recovery條件由以下三個配置控制。

· gateway.expected_nodes,預期的節點數。加入叢集的節點數(資料節點或具備Master資格的節點)達到這個數量後立即開始gateway的恢複。預設為0

· gateway.recover_after_time,如果沒有達到預期的節點數量,則恢複過程将等待配置的時間,再嘗試恢複,預設為5分鐘

· gateway.recover_after_nodes,隻要配置數量的節點(資料節點或具備Master資格的節點)加入叢集就可以開始恢複

當叢集級、索引級中繼資料選舉完畢後,執行submitStateUpdateTask送出一個source的任務,觸發擷取shard級中繼資料的操作,這個Fetch過程是異步的,根據叢集分片數量規模,Fetch過程可能比較長,然後submit任務就結束,gateway流程結束。

4.3 選舉叢集級和索引級中繼資料

進入recovery主要流程:代碼實作GateWay#performStateRecovery中;首先向Master資格的節點發起請求,擷取他們的存儲的中繼資料

//具有Master資格的節點清單

String[] nodesIds = clusterService.state().nodes().getMasterNodes().keys().toArray(String.class);

//發送擷取請求并等待結果

        TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();

選舉叢集級中繼資料代碼如下:

public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException {

    //周遊請求的所有節點

    for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {

            if (nodeState.metaData() == null) {

                continue;

            found++;

    //根據元資訊中記錄的版本号選舉元資訊

            if (electedGlobalState == null) {

                electedGlobalState = nodeState.metaData();

            } else if (nodeState.metaData().version() > electedGlobalState.version()) {

            for (ObjectCursor<IndexMetaData> cursor : nodeState.metaData().indices().values()) {

                indices.addTo(cursor.value.getIndex(), 1);

選舉索引級中繼資料代碼如下:

    final Object[] keys = indices.keys;

    //周遊叢集中的全部索引

        for (int i = 0; i < keys.length; i++) {

            if (keys[i] != null) {

                Index index = (Index) keys[i];

                IndexMetaData electedIndexMetaData = null;

                int indexMetaDataCount = 0;

                //周遊請求的全部節點,對特定索引選擇版本号最高的作為該索引的中繼資料

                for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {

                    if (nodeState.metaData() == null) {

                        continue;

                    IndexMetaData indexMetaData = nodeState.metaData().index(index);

                    if (indexMetaData == null) {

                    if (electedIndexMetaData == null) {

                        electedIndexMetaData = indexMetaData;

                    } else if (indexMetaData.getVersion() > electedIndexMetaData.getVersion()) {

                    indexMetaDataCount++;

5. 叢集子產品

叢集子產品封裝了叢集層面要執行的任務,把分片配置設定給節點屬于叢集層面的工作,在節點間遷移分片保持資料平衡,叢集健康、叢集級元資訊管理,以及節點管理都屬于叢集層面的工作。

5.1 叢集狀态

叢集狀态在ES中封裝為ClusterState類。可以通過_cluster/state API來擷取叢集狀态

curl -X GET "localhost:9200/_cluster/state/"

響應資訊中提供了叢集名稱、叢集狀态的總壓縮大小(下發到資料節點時被壓縮的)和叢集狀态本身,請求時可以通過設定過濾器來擷取特定内容。

預設請求下,Coordinating node在收到這個請求後,會把請求路由到Master node上執行,確定擷取最新叢集狀态。可以通過在請求中添加"local=true"參數,讓接受請求的節點傳回本地的叢集狀态。

5.2 内部封裝和實作

MasterService和ClusterApplierService分布負責運作任務和應用任務産生的叢集

5.2.1 MasterService

MasterService類負責叢集任務管理、運作等工作。其對外提供送出任務接口,内部維護一個線程池運作這些任務,對外提供主要接口如下:

方法 簡介
numberOfPendingTasks 傳回待執行的任務數量
pendingTasks 傳回待執行的任務清單
submitStateUpdateTasks 送出任務叢集

主要資料成員如下:

成員
clusterStatePublisher 釋出叢集任務的子產品
clusterStateSupplier 存儲叢集狀态
slowTaskLoggingThreshold 叢集任務慢執行的檢測
threadPoolExecutor 執行叢集任務的線程池
taskBatcher 管理、執行送出的任務,通過submitStateUpdateTask方法送出調用内部類Batcher的送出方法

5.2.2 ClusterApplierService

ClusterApplierService類負責管理需要對叢集任務進行處理的子產品(Applier)和監聽器(Listener),以及通知各個Applier應用叢集狀态,其對外提供接受叢集狀态的接口,當傳輸子產品接受到叢集狀态時,調用這個接口将叢集狀态傳遞過來,内部維護一個線程池用于應用叢集狀态。對外提供主要接口如下:

addStateApplier 添加一個叢集狀态的處理器
addListener 添加一個叢集狀态的監聽器
removeApplier 删除一個叢集狀态的處理器
removeListener 删除一個叢集狀态的監聽器
state 傳回叢集狀态
onNewClusterState 收到新的叢集狀态
submitStateUpdateTask 在新的線程池應用叢集狀态

主要資料成員如下表示:

clusterSettings 儲存要通知的叢集狀态應用處理器
clusterStateListeners 儲存叢集狀态監聽器
儲存最後的叢集狀态
應用叢集任務叢集的線程池

5.3 内部子產品如何送出任務

内部子產品通過clusterService.submitStateUpdateTask來送出一個任務叢集。

· ClusterStateTaskListener:送出任務時實作一些回調函數,例如對任務處理失敗、叢集狀态處理完畢時的處理。

· CLusterStateTaskExecutor:主要定義要執行的任務。每個任務在執行時會傳入目前叢集狀态,任務執行完畢傳回新産生的叢集狀态,如果沒有産生新的叢集狀态,則傳回原叢集狀态執行個體。

· ClusterStateTaskConfig:任務的配置資訊,包括逾時和優先級。

clusterService.submitStateUpdateTask("allocation indices ",new ClusterStateUpdateTask{

    //實作要執行的具體任務,任務傳回新的叢集狀态

    public ClusterState execute(ClusterState currentState) {

    //任務執行失敗的回調

    public void onFailure(String source, Exception e){

    //叢集狀态處理完畢的回調,當叢集狀态已經被全部的Applier和Listener處理完成時調用

    public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {

})

5.4 叢集狀态釋出

釋出叢集狀态是一個分布式事務操作,分布式事務需要實作原子性:要麼所有參與者都送出事務,要麼都取消事務。ES使用二段送出來實作分布式事務。二段送出可以避免失敗復原,其基本過程是:把資訊發下去,但是不應用,如果得到多數節點确認,則再發一個請求出去要求節點應用。

ES實作二段送出與标準二段送出有些差別,釋出叢集狀态到參與者的數量并非定義為全部,而是多數節點成功就算成功。多數的定義取決于配置項:

discovery.zen.minimum_master_nodes

兩個階段過程如下:

· 釋出階段:釋出叢集狀态,等待響應

· 送出階段:收到的響應數量大于minimum_master_nodes數量,發送commit請求。

主節點吧叢集狀态發下去,節點收到後不應用,當節點在discovery.zen.commit_timeout逾時時間内,收到節點的确認數量達到discovery.zen.minimum_master_nodes-1(去掉本節點),主節點開始發送送出請求,如果節點discovery.zen.commit_timeout逾時時間内沒有收到主節點送出請求,則拒絕該叢集狀态。當節點收到節點的送出請求後,開始應用叢集狀态。主節點等待響應,直到收到全部響應,整個流程釋出介紹。

6. 總結

由于篇幅有限,本篇檔案簡單給大家介紹elasticsearch叢集基本概念、elasticsearch分布式架構中一些子產品的設計以及核心源碼的分析,為自己建構一個簡單的分布式搜尋系統提供一些參考方向,不再需要将過多的精力放在日志中心本身的高可用性和查詢優化上,可以無感覺的增加或減少叢集的節點,進而做到更高效的進行日志的業務開發工作。