天天看點

小米Elasticsearch 服務化實踐

轉載自 :小米運維(公衆号 ID:MI-SRE)

摘要

使用 Elasticsearch 做日志檢索、分析服務已成為目前網際網路公司首選工具之一了。那麼怎麼與公司内部系統(如資料采集、資料 schema 管理、資料權限等系統)打通,怎麼提供一套易用的内部 Elasticsearch 系統便成為了首先需要解決的問題。本文将介紹小米内部 Elasticsearch 服務化之路的演進。

資料鍊路圖

首先給大家介紹一下我米的資料鍊路圖(老版)

小米Elasticsearch 服務化實踐

簡單劃分了四層,資料采集、資料控制與傳輸、資料存儲與消息隊列、資料應用層。

整個資料鍊路中,由資料工場負責源資料注冊、schema 定義,權限控制。資料注冊後會更新 scribe 白名單(重新整理 scribe 配置),建立 hdfs 目錄,hive 表建立、授權等。還有一個重要功能就是資料 schema 定義。包括 schema 序列化與反序列化協定,字段定義等等。業務方在資料工場注冊好資料之後就可以通過 agent 打資料了。一般常用的方式就是 xlogger。xlogger 是我們内部開發的一個 java log sdk,支援 scribe 協定,能夠通過 xlogger 直接往 scribe-agent 或者 scribe-server 打資料。scribe-server 根據資料工場定義的規則,将資料寫入 hdfs 或者 kafka。然後資料應用層開始通過各種工具進行分析。

怎麼将 elasticsearch 服務無縫對接到公司成熟的資料鍊路中?

我們主要考慮以下幾個因素

  1. Elasticsearch 輸入源從哪裡對接最合适?
  2. 怎麼結合資料工場定義的 schema 去做反序列化?
  3. 怎麼給資料做 ETL?
  4. 怎麼友善管理和監控?

對與第 1 點,我們選擇了對接 kafka 和 hdfs。其中 kafka 走實時資料攝入,hdfs 走離線資料攝入。

綜合 2、3、4 點,我們隻有兩種方案可選,一是在現有開源元件上做二次開發,與我們内部資料工場打通,能通過資料工場定義的 schema 結構去做反序列化個 ETL。二是自研類似 logstash 的中間件。

最終我們選擇了自研 kafka2es 元件(用 java 開發)。主要實作第 2 和第 3 點。當定位清楚了,那麼實作起來,也不複雜。第一版做的非常簡單,通過 pom 去把業務在資料工場定義的 schema 的反序列化類引入進來,實作一個通用的 ETL 類(也就是存粹把從 kafka 讀出來的 byte 數組的反序列化出來,再轉成 json 結構寫入 elasticsearch),然後再留一個可配置的空間,讓業務自己實作 ETL 邏輯,我們 kafka2es 在 elk 時通過動态加載的方式,調用業務實作的 ETL 類做處理。監控方面就是在各個環節加計數:read kafka 條數、write es 條數、write 失敗條數、etl 失敗條數,将點打入 falcon 做監控告警。

具體實作

我們将讀 kafka 和寫 es 的邏輯全部封裝好(這裡不較長的描述了,代碼都非常簡單),當業務接入的時候隻需要準備一個配置檔案和一個 ETL 類,提到我們 kafka2es 的項目中即可。如果不需要特殊處理的資料,則隻需一份配置檔案即可,樣例如下:

server.conf

kafka_topic_name=xxx                                #kafka topic 名稱
kafka_zk_url=xxx                                    #kafka zk 位址
kafka_consumer_groupid=xxx                          #kafka 消費者 group id
decoder_class=xxx                                   #資料分序列化類
es_url=xxx                                          #elasticsearch 叢集位址
es_cluster_name=es-test                             #elasticsearch 叢集名稱
es_index_name=xxx                                   #寫入索引名稱
es_index_name_suffix_format=yyyy.MM.dd              #索引日期字尾規則,如按天建索引則是yyyy.MM.dd 按月建索引則是yyyy.MM
es_index_type_name=doc                              #索引type名,統一預設使用doc
es_authtoken=xxx                                    #權限token,通過賬号密碼加密得到
parser_class=com.xiaomi.data.CommonParser           #ETL類 一般為業務自行實作,主要用于資料清洗和過濾
thread_num=1                                        #處理線程數
es_index_pipeline=test-pipeline                     #pipeline,如nginx日志,apache日志通過pipeline處理更為簡單,預設所有data節點都開了ingest功能,master節點和client節點都不開ingest功能           

以上配置分為三個部分:kafka 相關,elasticsearch 叢集相關,ETL 相關。整個 kafka2es 項目也封裝了 三個部分,寫讀 kafka,做 ETL,寫 elasticsearch。

ETL 類怎麼抽象出來讓業務自行實作?

public abstract class BaseParser {
/**
 * @param log byte[]
 * @return json string
 */
public abstract  String parser(byte[] log);
}           

通過定義個一個抽象類 BaseParser,抽象方法 parser,業務 ETL 類隻需要基礎 BaseParser,實作 parser,parser 方法的參數是從 kafka 讀出來的消息體 byte[],傳回值必須是 json string。

可以說是非常簡單,整個資料流就這樣跑通了(目前主要做了實時接入,對接 kafka)。

小米Elasticsearch 服務化實踐

怎麼實作 elasticsearch 的多租戶權限管理?權限管理系統賬号怎麼與公司内部賬号系統打通?

調研了幾個 es 權限管理插件,如: x-pack 、search-guard、elasticsearch-http-user-auth 等。x-pack 為商用,放棄;search-guard 實作略複雜,從可維護性和二次開發上看都不太滿意。elasticsearch-http-user-auth 非常非常簡潔,但是隻支援 http 鑒權,沒有 transport 鑒權。

github 位址

search-guard:https://github.com/floragunncom/search-guard

elasticsearch-http-user-auth:https://github.com/elasticfence/elasticsearch-http-user-auth

于是我們還是決定自己寫一個:es-authority-manager-mi

主要功能實作了:

  1. 索引的讀寫權限管理(http+transport)
  2. 賬号體系與公司内部 kerberos 賬号打通
  3. 所有使用者行為記錄

下面我簡單分析一下鑒權插件的實作 (elasticsearch-5.6.2):

首先看插件的主類

public class AuthorityManagerMiPlugin extends Plugin implements ActionPlugin, NetworkPlugin {

    public AuthorityManagerMiPlugin(final Settings settings) {
        //插件初始化
    }

    //restful 接口注冊, AuthorityManagerAction實作插件api接口
    @Override
    public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
                                         IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
                                         IndexNameExpressionResolver indexNameExpressionResolver, Supplier<DiscoveryNodes> nodesInCluster) {
        final List<RestHandler> handlers = new ArrayList<RestHandler>(1);
        if (!AMMPluginIsDisabled) {
            handlers.add(new AuthorityManagerAction(settings, restController));
        }
        return handlers;
    }

    //transport攔截器注冊
    @Override
    public List<Class<? extends ActionFilter>> getActionFilters() {
        List<Class<? extends ActionFilter>> filters = new ArrayList<>(1);
        if (!AMMPluginIsDisabled) {
            filters.add(AuthorityManagerFilter.class);
        }
        return filters;
    }

    //restful攔截器注冊
    @Override
    public UnaryOperator<RestHandler> getRestHandlerWrapper(final ThreadContext threadContext) {

        if (!AMMPluginIsDisabled) {
            if (AMMiRestHandler == null) {
                AMMiLogger.warn("AMMiRestHandler is null");
            }
            return (rh) -> AMMiRestHandler.wrap(rh);
        }
        return null;
    }
//.......
}           

插件 resful api 的實作:

public class AuthorityManagerAction extends BaseRestHandler {

    @Inject
    public AuthorityManagerAction(final Settings settings, final RestController controller) {
        super(settings);
        controller.registerHandler(GET, "/_authority_manager/auth", this);
        controller.registerHandler(POST, "/_authority_manager/auth", this);
    }

    @Override
    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
    }
//......
}           

transport 攔截器實作

public class AuthorityManagerFilter implements ActionFilter {

    public AuthorityManagerFilter(Settings settings, Client client, ClusterService clusterService, IndexNameExpressionResolver resolver) {
     //....
    }

    @Override
    public int order() {
        return Integer.MIN_VALUE;
    }

    @Override
    public void apply(Task task, final String action, final ActionRequest request, final ActionListener listener, final ActionFilterChain chain) {
        //權限校驗
    }
}           

restful 攔截器實作

public class AuthorityManagerRestFilter {

    public AuthorityManagerRestFilter(Settings settings) {
       //....
    }
    public RestHandler wrap(RestHandler original) {
        return new RestHandler() {
            //....
        }
    }
}           

以上就是 es 權限管理插件實作的整體結構與過程,細節上大家自行看看對應繼承類和實作類的源碼即可。

不過上面僅僅是實作了初步的鑒權相關的功能,但是既然所有請求都能攔截到了,那麼我們也可以做更進一步的控制了:

如:

  1. 查詢請求的 qps 限制(search 配額、請求排隊等)
  2. 查詢的慢日志分析
  3. 叢集高負載情況下的自我保護機制
  4. 多租戶資源隔離機制等等

    以上也是我們未來規劃。

kafka2es 進化版 ->es-sink

kafka2es 在使用者量劇增的情況下怎麼改進,怎麼進一步減少接入成本以及運維成本。怎麼與公司資料鍊路更新去公共進化?

首先來看下面新的資料鍊路圖:

小米Elasticsearch 服務化實踐

兩個新名詞:lcs talos

lcs 和 talos 是我米自研的兩個元件,簡單來說,lcs 對标 scribe,talos 對标 kafka。具體介紹可參考如下文章:

淺析什麼是小米資料流服務

在這一套新的資料鍊路中,我們将 kafka2es 所有的功能內建到小米新版資料流中,作為 es-sink 提供服務。于是,新的架構變成了這樣:

小米Elasticsearch 服務化實踐

es-sink 還是繼承了 kafka2es 的所有功能

  1. 關聯資料工場 schema 定義,反序列化資料
  2. 自定義 ETL 處理
  3. 監控

其中 es-sink 與 kafka2es 最大的差別是通過 spark streaming 實作。而 kafka2es 還是獨立的 java 項目,每次業務接入還需要業務送出配置檔案與 ETL 代碼,我們 review 後需要部署該服務,但是 es-sink 的實作方式可以讓業務注冊好索引之後,通過管理系統前端配置 es-sink,對應的 spark streaming 就起來了,還不需要關心資源問題(資源動态調整),很大程度減少了我們業務支援和運維的工作。

總結

本文主要介紹了資料攝傳入連結路與公司資料鍊路整合以及索引權限管理插件的實作。可能大家也存在一些疑問,比如說:為什麼我們不用社群開源的 filebeat,從用戶端直接打到 elasticsearch,反而增加了好幾個中間層的元件,這樣導緻架構複雜性的增加以及資料延時的增大?

要打造内部公共的 Elasticsearch 服務,責權分離一定要搞清楚:如果把直接寫 Elasticsearch 的權限放開給業務方,帶來的安全隐患是非常大的。業務的流量怎麼控制?并發寫入怎麼控制?出錯如何定位?怎麼拿到業務的序列化類去反序列化資料等等。這些問題都可能是緻命的。是以必須在中間加一次消息隊列做緩沖。這樣來做到與業務用戶端的解耦。業務方隻需要按照公司統一标準采集資料,寫入消息隊列(kafka/talos),這時候業務用戶端的使命已經完成了。而我們,則負責将資料消費出來,經過業務自定義的 ETL 邏輯,寫入 Elasticsearch。這個過程由我們控制,優勢由以下幾點:

1:資料接入 Elasticsearch 不需要改造業務現有架構(依然走公司統一資料收集方式)

2:減少 client 的并發連接配接;Elasticsearch 不管寫入還是搜尋,都是配了線程池。并發連接配接過多,非常容易把寫入線程池打滿,導緻拒絕請求

3:提高寫入效率;通過 transport bulk 模式實作寫入,相比 filebeat 的 http 方式從寫入效率上來說,有比較大的提升

4:增強可控性;當叢集資料量非常大的時候,如果要對叢集做更新或者重新開機,如果同時還在大量寫資料,分片恢複的時間非常慢長。但是可以與大頭業務協商,暫停寫入(隻需要把服務端的資料攝入停止即可,不用業務用戶端操作),操作叢集,恢複寫入

5:叢集更新完全不用業務方做适配

6:資料攝入監控很好收斂,故障定位非常明确

以上雖然表面上增加了系統的複雜度,和一定程度的寫入延遲,但是都是可接受的。對于複雜度并沒有過多的增大,由于與内部資料流的對接,對業務來說反而更加簡單。雖然實時性犧牲了一些(整條鍊路資料延時在 1 分鐘以内),但還是可接受的。

另外就是權限控制,權限控制其實是分為兩個方面的,一、資料讀寫的權限控制(安全與隐私),二、資源權限控制(如:并發讀寫量控制);在 elasticsearch 中,讀寫權限控制容易做,資源權限控制并不太好做,我們也正在研究中,歡迎大家一起交流。

參考文獻

https://www.elastic.co

https://github.com/elastic/elasticsearch

繼續閱讀